Introduced method Request.getConnection() to expose the Connection after at the request begin event. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
fcf50ff624
commit
efda646319
|
@ -13,6 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.io.CyclicTimeouts;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
|
@ -53,7 +54,7 @@ public abstract class HttpChannel implements CyclicTimeouts.Expirable
|
|||
{
|
||||
boolean result = false;
|
||||
boolean abort = true;
|
||||
try (AutoLock l = _lock.lock())
|
||||
try (AutoLock ignored = _lock.lock())
|
||||
{
|
||||
if (_exchange == null)
|
||||
{
|
||||
|
@ -64,12 +65,14 @@ public abstract class HttpChannel implements CyclicTimeouts.Expirable
|
|||
}
|
||||
}
|
||||
|
||||
HttpRequest request = exchange.getRequest();
|
||||
if (abort)
|
||||
{
|
||||
exchange.getRequest().abort(new UnsupportedOperationException("Pipelined requests not supported"));
|
||||
request.abort(new UnsupportedOperationException("Pipelined requests not supported"));
|
||||
}
|
||||
else
|
||||
{
|
||||
request.setConnection(getConnection());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} associated {} to {}", exchange, result, this);
|
||||
}
|
||||
|
@ -80,7 +83,7 @@ public abstract class HttpChannel implements CyclicTimeouts.Expirable
|
|||
public boolean disassociate(HttpExchange exchange)
|
||||
{
|
||||
boolean result = false;
|
||||
try (AutoLock l = _lock.lock())
|
||||
try (AutoLock ignored = _lock.lock())
|
||||
{
|
||||
HttpExchange existing = _exchange;
|
||||
_exchange = null;
|
||||
|
@ -98,12 +101,14 @@ public abstract class HttpChannel implements CyclicTimeouts.Expirable
|
|||
|
||||
public HttpExchange getHttpExchange()
|
||||
{
|
||||
try (AutoLock l = _lock.lock())
|
||||
try (AutoLock ignored = _lock.lock())
|
||||
{
|
||||
return _exchange;
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Connection getConnection();
|
||||
|
||||
@Override
|
||||
public long getExpireNanoTime()
|
||||
{
|
||||
|
|
|
@ -15,6 +15,7 @@ package org.eclipse.jetty.client;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -277,6 +278,18 @@ public class HttpProxy extends ProxyConfiguration.Proxy
|
|||
this.promise = promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getLocalSocketAddress()
|
||||
{
|
||||
return connection.getLocalSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getRemoteSocketAddress()
|
||||
{
|
||||
return connection.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Request request, Response.CompleteListener listener)
|
||||
{
|
||||
|
|
|
@ -41,6 +41,7 @@ import java.util.function.Consumer;
|
|||
import java.util.function.LongConsumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.ContentProvider;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
|
@ -69,6 +70,7 @@ public class HttpRequest implements Request
|
|||
private final AtomicReference<Throwable> aborted = new AtomicReference<>();
|
||||
private final HttpClient client;
|
||||
private final HttpConversation conversation;
|
||||
private Connection connection;
|
||||
private String scheme;
|
||||
private String host;
|
||||
private int port;
|
||||
|
@ -162,6 +164,17 @@ public class HttpRequest implements Request
|
|||
return conversation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection()
|
||||
{
|
||||
return connection;
|
||||
}
|
||||
|
||||
void setConnection(Connection connection)
|
||||
{
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getScheme()
|
||||
{
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package org.eclipse.jetty.client.api;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
|
||||
|
@ -46,4 +47,20 @@ public interface Connection extends Closeable
|
|||
* @see #close()
|
||||
*/
|
||||
boolean isClosed();
|
||||
|
||||
/**
|
||||
* @return the local socket address associated with the connection
|
||||
*/
|
||||
default SocketAddress getLocalSocketAddress()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the remote socket address associated with the connection
|
||||
*/
|
||||
default SocketAddress getRemoteSocketAddress()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,22 @@ import org.eclipse.jetty.util.Fields;
|
|||
*/
|
||||
public interface Request
|
||||
{
|
||||
/**
|
||||
* <p>Returns the connection associated with this request.</p>
|
||||
* <p>The connection is available only starting from the
|
||||
* {@link #onRequestBegin(BeginListener) request begin} event,
|
||||
* when a connection is associated with the request to be sent,
|
||||
* otherwise {@code null} is returned.</p>
|
||||
*
|
||||
* @return the connection associated with this request,
|
||||
* or {@code null} if there is no connection associated
|
||||
* with this request
|
||||
*/
|
||||
default Connection getConnection()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the URI scheme of this request, such as "http" or "https"
|
||||
*/
|
||||
|
|
|
@ -17,6 +17,7 @@ import java.util.concurrent.atomic.LongAdder;
|
|||
|
||||
import org.eclipse.jetty.client.HttpChannel;
|
||||
import org.eclipse.jetty.client.HttpExchange;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
|
@ -55,6 +56,12 @@ public class HttpChannelOverHTTP extends HttpChannel
|
|||
return new HttpReceiverOverHTTP(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection getConnection()
|
||||
{
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpSenderOverHTTP getHttpSender()
|
||||
{
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.client.http;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.util.Collections;
|
||||
|
@ -100,6 +101,18 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
|
|||
return delegate.getHttpDestination();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getLocalSocketAddress()
|
||||
{
|
||||
return delegate.getLocalSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getRemoteSocketAddress()
|
||||
{
|
||||
return delegate.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBytesIn()
|
||||
{
|
||||
|
@ -285,6 +298,18 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
|
|||
return Collections.<HttpChannel>singleton(channel).iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getLocalSocketAddress()
|
||||
{
|
||||
return getEndPoint().getLocalSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getRemoteSocketAddress()
|
||||
{
|
||||
return getEndPoint().getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SendFailure send(HttpExchange exchange)
|
||||
{
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
||||
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
|
||||
import org.eclipse.jetty.client.util.AsyncRequestContent;
|
||||
|
@ -68,26 +69,16 @@ public class HttpClientFailureTest
|
|||
{
|
||||
startServer(new EmptyServerHandler());
|
||||
|
||||
final AtomicReference<HttpConnectionOverHTTP> connectionRef = new AtomicReference<>();
|
||||
client = new HttpClient(new HttpClientTransportOverHTTP(1)
|
||||
{
|
||||
@Override
|
||||
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
|
||||
{
|
||||
HttpConnectionOverHTTP connection = (HttpConnectionOverHTTP)super.newConnection(endPoint, context);
|
||||
connectionRef.set(connection);
|
||||
return connection;
|
||||
}
|
||||
});
|
||||
client = new HttpClient(new HttpClientTransportOverHTTP(1));
|
||||
client.start();
|
||||
|
||||
assertThrows(ExecutionException.class, () ->
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.onRequestHeaders(request -> connectionRef.get().getEndPoint().close())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send());
|
||||
Request request = client.newRequest("localhost", connector.getLocalPort())
|
||||
.onRequestHeaders(r -> r.getConnection().close())
|
||||
.timeout(5, TimeUnit.SECONDS);
|
||||
assertThrows(ExecutionException.class, request::send);
|
||||
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool();
|
||||
HttpDestination destination = (HttpDestination)client.resolveDestination(request);
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
assertEquals(0, connectionPool.getConnectionCount());
|
||||
assertEquals(0, connectionPool.getActiveConnections().size());
|
||||
assertEquals(0, connectionPool.getIdleConnections().size());
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.eclipse.jetty.client.HttpChannel;
|
|||
import org.eclipse.jetty.client.HttpExchange;
|
||||
import org.eclipse.jetty.client.HttpReceiver;
|
||||
import org.eclipse.jetty.client.HttpSender;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.fcgi.generator.Flusher;
|
||||
import org.eclipse.jetty.fcgi.generator.Generator;
|
||||
|
@ -43,7 +44,7 @@ public class HttpChannelOverFCGI extends HttpChannel
|
|||
private int request;
|
||||
private HttpVersion version;
|
||||
|
||||
public HttpChannelOverFCGI(final HttpConnectionOverFCGI connection, Flusher flusher, long idleTimeout)
|
||||
public HttpChannelOverFCGI(HttpConnectionOverFCGI connection, Flusher flusher, long idleTimeout)
|
||||
{
|
||||
super(connection.getHttpDestination());
|
||||
this.connection = connection;
|
||||
|
@ -63,6 +64,12 @@ public class HttpChannelOverFCGI extends HttpChannel
|
|||
this.request = request;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection getConnection()
|
||||
{
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpSender getHttpSender()
|
||||
{
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package org.eclipse.jetty.fcgi.client.http;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.util.Collections;
|
||||
|
@ -87,6 +88,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
return destination;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getLocalSocketAddress()
|
||||
{
|
||||
return delegate.getLocalSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getRemoteSocketAddress()
|
||||
{
|
||||
return delegate.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
protected Flusher getFlusher()
|
||||
{
|
||||
return flusher;
|
||||
|
@ -319,7 +332,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
|
||||
private int acquireRequest()
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
int last = requests.getLast();
|
||||
int request = last + 1;
|
||||
|
@ -330,7 +343,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
|
||||
private void releaseRequest(int request)
|
||||
{
|
||||
try (AutoLock l = lock.lock())
|
||||
try (AutoLock ignored = lock.lock())
|
||||
{
|
||||
requests.removeFirstOccurrence(request);
|
||||
}
|
||||
|
@ -373,6 +386,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
return channel == null ? Collections.emptyIterator() : Collections.singleton(channel).iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getLocalSocketAddress()
|
||||
{
|
||||
return getEndPoint().getLocalSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getRemoteSocketAddress()
|
||||
{
|
||||
return getEndPoint().getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SendFailure send(HttpExchange exchange)
|
||||
{
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.eclipse.jetty.client.HttpDestination;
|
|||
import org.eclipse.jetty.client.HttpExchange;
|
||||
import org.eclipse.jetty.client.HttpReceiver;
|
||||
import org.eclipse.jetty.client.HttpSender;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.HTTP2Channel;
|
||||
|
@ -67,6 +68,12 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
|||
return listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection getConnection()
|
||||
{
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpSender getHttpSender()
|
||||
{
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.http2.client.http;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -68,6 +69,18 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
|
|||
return session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getLocalSocketAddress()
|
||||
{
|
||||
return session.getLocalSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getRemoteSocketAddress()
|
||||
{
|
||||
return session.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
public boolean isRecycleHttpChannels()
|
||||
{
|
||||
return recycleHttpChannels;
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.eclipse.jetty.client.HttpDestination;
|
|||
import org.eclipse.jetty.client.HttpExchange;
|
||||
import org.eclipse.jetty.client.HttpReceiver;
|
||||
import org.eclipse.jetty.client.HttpSender;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.http3.api.Stream;
|
||||
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
|
||||
|
@ -50,6 +51,12 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
return receiver;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection getConnection()
|
||||
{
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpSender getHttpSender()
|
||||
{
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.http3.client.http.internal;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
@ -50,6 +51,18 @@ public class HttpConnectionOverHTTP3 extends HttpConnection implements Connectio
|
|||
return session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getLocalSocketAddress()
|
||||
{
|
||||
return session.getLocalSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketAddress getRemoteSocketAddress()
|
||||
{
|
||||
return session.getRemoteSocketAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxMultiplex()
|
||||
{
|
||||
|
|
|
@ -835,6 +835,25 @@ public class HttpClientTest extends AbstractTest<TransportScenario>
|
|||
assertEquals(200, response.getStatus());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testRequestConnection(Transport transport) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
|
||||
scenario.start(new EmptyServerHandler());
|
||||
|
||||
ContentResponse response = scenario.client.newRequest(scenario.newURI())
|
||||
.onRequestBegin(r ->
|
||||
{
|
||||
if (r.getConnection() == null)
|
||||
r.abort(new IllegalStateException());
|
||||
})
|
||||
.send();
|
||||
|
||||
assertEquals(200, response.getStatus());
|
||||
}
|
||||
|
||||
private void sleep(long time) throws IOException
|
||||
{
|
||||
try
|
||||
|
|
Loading…
Reference in New Issue