Merge branch 'jetty-9' into jetty-9-configuration

Conflicts:
	jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
This commit is contained in:
Greg Wilkins 2012-09-12 23:11:12 +10:00
commit 79caf45604
44 changed files with 2269 additions and 560 deletions

View File

@ -34,10 +34,10 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<!-- <!--
Required for OSGI Required for OSGI
--> -->
<plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId> <artifactId>maven-jar-plugin</artifactId>
<configuration> <configuration>

View File

@ -0,0 +1,152 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpHeader;
public class AuthenticationProtocolHandler extends Response.Listener.Adapter implements ProtocolHandler
{
private static final Pattern WWW_AUTHENTICATE_PATTERN = Pattern.compile("([^\\s]+)\\s+realm=\"([^\"]+)\"(\\s*,\\s*)?(.*)", Pattern.CASE_INSENSITIVE);
private final ResponseNotifier notifier = new ResponseNotifier();
private final HttpClient client;
public AuthenticationProtocolHandler(HttpClient client)
{
this.client = client;
}
@Override
public boolean accept(Request request, Response response)
{
return response.status() == 401;
}
@Override
public Response.Listener getResponseListener()
{
return this;
}
@Override
public void onComplete(Result result)
{
if (!result.isFailed())
{
List<WWWAuthenticate> wwwAuthenticates = parseWWWAuthenticate(result.getResponse());
if (wwwAuthenticates.isEmpty())
{
// TODO
}
else
{
Request request = result.getRequest();
final String uri = request.uri();
Authentication authentication = null;
for (WWWAuthenticate wwwAuthenticate : wwwAuthenticates)
{
authentication = client.getAuthenticationStore().findAuthentication(wwwAuthenticate.type, uri, wwwAuthenticate.realm);
if (authentication != null)
break;
}
if (authentication != null)
{
final Authentication authn = authentication;
authn.authenticate(request);
request.send(new Adapter()
{
@Override
public void onComplete(Result result)
{
if (!result.isFailed())
{
Authentication.Result authnResult = new Authentication.Result(uri, authn);
client.getAuthenticationStore().addAuthenticationResult(authnResult);
}
}
});
}
else
{
noAuthentication(request, result.getResponse());
}
}
}
}
private List<WWWAuthenticate> parseWWWAuthenticate(Response response)
{
List<WWWAuthenticate> result = new ArrayList<>();
List<String> values = Collections.list(response.headers().getValues(HttpHeader.WWW_AUTHENTICATE.asString()));
for (String value : values)
{
Matcher matcher = WWW_AUTHENTICATE_PATTERN.matcher(value);
if (matcher.matches())
{
String type = matcher.group(1);
String realm = matcher.group(2);
String params = matcher.group(4);
WWWAuthenticate wwwAuthenticate = new WWWAuthenticate(type, realm, params);
result.add(wwwAuthenticate);
}
}
return result;
}
private void noAuthentication(Request request, Response response)
{
HttpConversation conversation = client.getConversation(request);
Response.Listener listener = conversation.exchanges().peekFirst().listener();
notifier.notifyBegin(listener, response);
notifier.notifyHeaders(listener, response);
notifier.notifySuccess(listener, response);
// TODO: this call here is horrid, but needed... but here it is too late for the exchange
// TODO: to figure out that the conversation is finished, so we need to manually do it here, no matter what.
// TODO: However, we also need to make sure that requests are not resent with the same ID
// TODO: because here the connection has already been returned to the pool, so the "new" request may see
// TODO: the same conversation but it's not really the case.
// TODO: perhaps the factory for requests should be the conversation ?
conversation.complete();
notifier.notifyComplete(listener, new Result(request, response));
}
private class WWWAuthenticate
{
private final String type;
private final String realm;
private final String params;
public WWWAuthenticate(String type, String realm, String params)
{
this.type = type;
this.realm = realm;
this.params = params;
}
}
}

View File

@ -0,0 +1,80 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.AuthenticationStore;
public class HttpAuthenticationStore implements AuthenticationStore
{
private final List<Authentication> authentications = new CopyOnWriteArrayList<>();
private final Map<String, Authentication> results = new ConcurrentHashMap<>();
@Override
public void addAuthentication(Authentication authentication)
{
authentications.add(authentication);
}
@Override
public void removeAuthentication(Authentication authentication)
{
authentications.remove(authentication);
}
@Override
public Authentication findAuthentication(String type, String uri, String realm)
{
for (Authentication authentication : authentications)
{
if (authentication.matches(type, uri, realm))
return authentication;
}
return null;
}
@Override
public void addAuthenticationResult(Authentication.Result result)
{
results.put(result.getURI(), result.getAuthentication());
}
@Override
public void removeAuthenticationResults()
{
results.clear();
}
@Override
public Authentication findAuthenticationResult(String uri)
{
// TODO: I should match the longest URI
for (Map.Entry<String, Authentication> entry : results.entrySet())
{
if (uri.startsWith(entry.getKey()))
return entry.getValue();
}
return null;
}
}

View File

@ -30,10 +30,12 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.CookieStore; import org.eclipse.jetty.client.api.CookieStore;
@ -74,12 +76,12 @@ import org.eclipse.jetty.util.thread.TimerScheduler;
* *
* // Building a request with a timeout * // Building a request with a timeout
* HTTPClient client = new HTTPClient(); * HTTPClient client = new HTTPClient();
* Response response = client.newRequest("localhost:8080").send().get(5, TimeUnit.SECONDS); * Response response = client.newRequest("http://localhost:8080").send().get(5, TimeUnit.SECONDS);
* int status = response.status(); * int status = response.status();
* *
* // Asynchronously * // Asynchronously
* HTTPClient client = new HTTPClient(); * HTTPClient client = new HTTPClient();
* client.newRequest("localhost:8080").send(new Response.Listener.Adapter() * client.newRequest("http://localhost:8080").send(new Response.Listener.Adapter()
* { * {
* &#64;Override * &#64;Override
* public void onSuccess(Response response) * public void onSuccess(Response response)
@ -95,7 +97,10 @@ public class HttpClient extends AggregateLifeCycle
private final ConcurrentMap<String, HttpDestination> destinations = new ConcurrentHashMap<>(); private final ConcurrentMap<String, HttpDestination> destinations = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, HttpConversation> conversations = new ConcurrentHashMap<>(); private final ConcurrentMap<Long, HttpConversation> conversations = new ConcurrentHashMap<>();
private final List<ProtocolHandler> handlers = new CopyOnWriteArrayList<>();
private final List<Request.Listener> requestListeners = new CopyOnWriteArrayList<>();
private final CookieStore cookieStore = new HttpCookieStore(); private final CookieStore cookieStore = new HttpCookieStore();
private final AuthenticationStore authenticationStore = new HttpAuthenticationStore();
private volatile Executor executor; private volatile Executor executor;
private volatile ByteBufferPool byteBufferPool; private volatile ByteBufferPool byteBufferPool;
private volatile Scheduler scheduler; private volatile Scheduler scheduler;
@ -108,9 +113,20 @@ public class HttpClient extends AggregateLifeCycle
private volatile int maxQueueSizePerAddress = 1024; private volatile int maxQueueSizePerAddress = 1024;
private volatile int requestBufferSize = 4096; private volatile int requestBufferSize = 4096;
private volatile int responseBufferSize = 4096; private volatile int responseBufferSize = 4096;
private volatile int maxRedirects = 8;
private volatile SocketAddress bindAddress; private volatile SocketAddress bindAddress;
private volatile long idleTimeout; private volatile long idleTimeout;
public HttpClient()
{
this(null);
}
public HttpClient(Executor executor)
{
this.executor = executor;
}
public ByteBufferPool getByteBufferPool() public ByteBufferPool getByteBufferPool()
{ {
return byteBufferPool; return byteBufferPool;
@ -139,6 +155,9 @@ public class HttpClient extends AggregateLifeCycle
selectorManager = newSelectorManager(); selectorManager = newSelectorManager();
addBean(selectorManager); addBean(selectorManager);
handlers.add(new RedirectProtocolHandler(this));
handlers.add(new AuthenticationProtocolHandler(this));
super.doStart(); super.doStart();
LOG.info("Started {}", this); LOG.info("Started {}", this);
@ -166,11 +185,21 @@ public class HttpClient extends AggregateLifeCycle
LOG.info("Stopped {}", this); LOG.info("Stopped {}", this);
} }
public List<Request.Listener> getRequestListeners()
{
return requestListeners;
}
public CookieStore getCookieStore() public CookieStore getCookieStore()
{ {
return cookieStore; return cookieStore;
} }
public AuthenticationStore getAuthenticationStore()
{
return authenticationStore;
}
public long getIdleTimeout() public long getIdleTimeout()
{ {
return idleTimeout; return idleTimeout;
@ -224,9 +253,9 @@ public class HttpClient extends AggregateLifeCycle
return new HttpRequest(this, uri); return new HttpRequest(this, uri);
} }
protected Request newRequest(long id, URI uri) protected Request newRequest(long id, String uri)
{ {
return new HttpRequest(this, id, uri); return new HttpRequest(this, id, URI.create(uri));
} }
private String address(String scheme, String host, int port) private String address(String scheme, String host, int port)
@ -340,6 +369,16 @@ public class HttpClient extends AggregateLifeCycle
this.responseBufferSize = responseBufferSize; this.responseBufferSize = responseBufferSize;
} }
public int getMaxRedirects()
{
return maxRedirects;
}
public void setMaxRedirects(int maxRedirects)
{
this.maxRedirects = maxRedirects;
}
protected void newConnection(HttpDestination destination, Callback<Connection> callback) protected void newConnection(HttpDestination destination, Callback<Connection> callback)
{ {
SocketChannel channel = null; SocketChannel channel = null;
@ -376,7 +415,7 @@ public class HttpClient extends AggregateLifeCycle
} }
} }
public HttpConversation getConversation(Request request) protected HttpConversation getConversation(Request request)
{ {
long id = request.id(); long id = request.id();
HttpConversation conversation = conversations.get(id); HttpConversation conversation = conversations.get(id);
@ -387,27 +426,25 @@ public class HttpClient extends AggregateLifeCycle
if (existing != null) if (existing != null)
conversation = existing; conversation = existing;
else else
LOG.debug("Created {}", conversation); LOG.debug("{} created", conversation);
} }
return conversation; return conversation;
} }
public void removeConversation(HttpConversation conversation) protected void removeConversation(HttpConversation conversation)
{ {
conversations.remove(conversation.id()); conversations.remove(conversation.id());
LOG.debug("Removed {}", conversation); LOG.debug("{} removed", conversation);
} }
public Response.Listener lookup(int status) // TODO: find a better method name
public Response.Listener lookup(Request request, Response response)
{ {
// TODO for (ProtocolHandler handler : handlers)
switch (status)
{ {
case 302: if (handler.accept(request, response))
case 303: return handler.getResponseListener();
return new RedirectionProtocolListener(this);
} }
return null; return null;
} }

View File

@ -18,9 +18,11 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
@ -76,7 +78,7 @@ public class HttpConnection extends AbstractConnection implements Connection
{ {
LOG.debug("{} idle timeout", this); LOG.debug("{} idle timeout", this);
HttpExchange exchange = this.exchange.get(); HttpExchange exchange = getExchange();
if (exchange != null) if (exchange != null)
idleTimeout(); idleTimeout();
else else
@ -97,7 +99,8 @@ public class HttpConnection extends AbstractConnection implements Connection
HttpConversation conversation = client.getConversation(request); HttpConversation conversation = client.getConversation(request);
HttpExchange exchange = new HttpExchange(conversation, this, request, listener); HttpExchange exchange = new HttpExchange(conversation, this, request, listener);
setExchange(exchange); setExchange(exchange);
conversation.add(exchange); conversation.exchanges().offer(exchange);
conversation.listener(listener);
sender.send(exchange); sender.send(exchange);
} }
@ -157,6 +160,11 @@ public class HttpConnection extends AbstractConnection implements Connection
if (cookieString != null) if (cookieString != null)
request.header(HttpHeader.COOKIE.asString(), cookieString.toString()); request.header(HttpHeader.COOKIE.asString(), cookieString.toString());
// Authorization
Authentication authentication = client.getAuthenticationStore().findAuthenticationResult(request.uri());
if (authentication != null)
authentication.authenticate(request);
// TODO: decoder headers // TODO: decoder headers
// If we are HTTP 1.1, add the Host header // If we are HTTP 1.1, add the Host header
@ -201,22 +209,33 @@ public class HttpConnection extends AbstractConnection implements Connection
receiver.receive(); receiver.receive();
} }
public void completed(HttpExchange exchange, boolean success) public void complete(HttpExchange exchange, boolean success)
{ {
if (this.exchange.compareAndSet(exchange, null)) HttpExchange existing = this.exchange.getAndSet(null);
if (existing == exchange)
{ {
LOG.debug("{} disassociated from {}", exchange, this); LOG.debug("{} disassociated from {}", exchange, this);
if (success) if (success)
{
HttpFields responseHeaders = exchange.response().headers();
Collection<String> values = responseHeaders.getValuesCollection(HttpHeader.CONNECTION.asString());
if (values != null && values.contains("close"))
{
destination.remove(this);
close();
}
else
{ {
destination.release(this); destination.release(this);
} }
}
else else
{ {
destination.remove(this); destination.remove(this);
close(); close();
} }
} }
else else if (existing == null)
{ {
// It is possible that the exchange has already been disassociated, // It is possible that the exchange has already been disassociated,
// for example if the connection idle timeouts: this will fail // for example if the connection idle timeouts: this will fail
@ -225,6 +244,10 @@ public class HttpConnection extends AbstractConnection implements Connection
// and will arrive here without an exchange being present. // and will arrive here without an exchange being present.
// We just ignore this fact, as the exchange has already been processed // We just ignore this fact, as the exchange has already been processed
} }
else
{
throw new IllegalStateException();
}
} }
@Override @Override

View File

@ -19,27 +19,19 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
public class HttpContentResponse implements ContentResponse public class HttpContentResponse implements ContentResponse
{ {
private final Response response; private final Response response;
private final BufferingResponseListener listener; private final byte[] content;
public HttpContentResponse(Response response, BufferingResponseListener listener) public HttpContentResponse(Response response, byte[] content)
{ {
this.response = response; this.response = response;
this.listener = listener; this.content = content;
}
@Override
public Request request()
{
return response.request();
} }
@Override @Override
@ -81,6 +73,6 @@ public class HttpContentResponse implements ContentResponse
@Override @Override
public byte[] content() public byte[] content()
{ {
return listener.content(); return content;
} }
} }

View File

@ -18,18 +18,24 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.util.Queue; import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.Deque;
import java.util.concurrent.atomic.AtomicReference; import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.Attributes;
public class HttpConversation public class HttpConversation implements Attributes
{ {
private final Queue<HttpExchange> exchanges = new ConcurrentLinkedQueue<>(); private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final AtomicReference<Response.Listener> listener = new AtomicReference<>(); private final Deque<HttpExchange> exchanges = new ConcurrentLinkedDeque<>();
private final HttpClient client; private final HttpClient client;
private final long id; private final long id;
private volatile Response.Listener listener;
private volatile HttpExchange last;
public HttpConversation(HttpClient client, long id) public HttpConversation(HttpClient client, long id)
{ {
@ -42,32 +48,68 @@ public class HttpConversation
return id; return id;
} }
public Deque<HttpExchange> exchanges()
{
return exchanges;
}
public Response.Listener listener() public Response.Listener listener()
{ {
return listener.get(); return listener;
} }
public void listener(Response.Listener listener) public void listener(Response.Listener listener)
{ {
this.listener.set(listener); this.listener = listener;
} }
public void add(HttpExchange exchange) public HttpExchange last()
{ {
exchanges.offer(exchange); return last;
} }
public HttpExchange first() public void last(HttpExchange exchange)
{ {
return exchanges.peek(); if (last == null)
last = exchange;
} }
public void complete() public void complete()
{ {
listener.set(null);
client.removeConversation(this); client.removeConversation(this);
} }
@Override
public Object getAttribute(String name)
{
return attributes.get(name);
}
@Override
public void setAttribute(String name, Object attribute)
{
attributes.put(name, attribute);
}
@Override
public void removeAttribute(String name)
{
attributes.remove(name);
}
@Override
public Enumeration<String> getAttributeNames()
{
return Collections.enumeration(attributes.keySet());
}
@Override
public void clearAttributes()
{
attributes.clear();
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -40,6 +41,7 @@ public class HttpDestination implements Destination, AutoCloseable
private static final Logger LOG = Log.getLogger(HttpDestination.class); private static final Logger LOG = Log.getLogger(HttpDestination.class);
private final AtomicInteger connectionCount = new AtomicInteger(); private final AtomicInteger connectionCount = new AtomicInteger();
private final ResponseNotifier responseNotifier = new ResponseNotifier();
private final HttpClient client; private final HttpClient client;
private final String scheme; private final String scheme;
private final String host; private final String host;
@ -47,6 +49,7 @@ public class HttpDestination implements Destination, AutoCloseable
private final Queue<RequestPair> requests; private final Queue<RequestPair> requests;
private final BlockingQueue<Connection> idleConnections; private final BlockingQueue<Connection> idleConnections;
private final BlockingQueue<Connection> activeConnections; private final BlockingQueue<Connection> activeConnections;
private final RequestNotifier requestNotifier;
public HttpDestination(HttpClient client, String scheme, String host, int port) public HttpDestination(HttpClient client, String scheme, String host, int port)
{ {
@ -57,6 +60,7 @@ public class HttpDestination implements Destination, AutoCloseable
this.requests = new ArrayBlockingQueue<>(client.getMaxQueueSizePerAddress()); this.requests = new ArrayBlockingQueue<>(client.getMaxQueueSizePerAddress());
this.idleConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress()); this.idleConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress()); this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress());
this.requestNotifier = new RequestNotifier(client);
} }
protected BlockingQueue<Connection> getIdleConnections() protected BlockingQueue<Connection> getIdleConnections()
@ -109,10 +113,10 @@ public class HttpDestination implements Destination, AutoCloseable
else else
{ {
LOG.debug("Queued {}", request); LOG.debug("Queued {}", request);
notifyRequestQueued(request); requestNotifier.notifyQueued(request);
Connection connection = acquire(); Connection connection = acquire();
if (connection != null) if (connection != null)
process(connection); process(connection, false);
} }
} }
else else
@ -126,47 +130,6 @@ public class HttpDestination implements Destination, AutoCloseable
} }
} }
private void notifyRequestQueued(Request request)
{
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onQueued(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyRequestFailure(Request request, Throwable failure)
{
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onFailure(request, failure);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyResponseFailure(Response.Listener listener, Throwable failure)
{
try
{
if (listener != null)
listener.onFailure(null, failure);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public Future<Connection> newConnection() public Future<Connection> newConnection()
{ {
FutureCallback<Connection> result = new FutureCallback<>(); FutureCallback<Connection> result = new FutureCallback<>();
@ -207,7 +170,7 @@ public class HttpDestination implements Destination, AutoCloseable
public void completed(Connection connection) public void completed(Connection connection)
{ {
LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this); LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this);
process(connection); process(connection, true);
} }
@Override @Override
@ -223,8 +186,8 @@ public class HttpDestination implements Destination, AutoCloseable
RequestPair pair = requests.poll(); RequestPair pair = requests.poll();
if (pair != null) if (pair != null)
{ {
notifyRequestFailure(pair.request, x); requestNotifier.notifyFailure(pair.request, x);
notifyResponseFailure(pair.listener, x); responseNotifier.notifyComplete(pair.listener, new Result(pair.request, x, null));
} }
} }
}); });
@ -245,13 +208,13 @@ public class HttpDestination implements Destination, AutoCloseable
* *
* @param connection the new connection * @param connection the new connection
*/ */
protected void process(final Connection connection) protected void process(final Connection connection, boolean dispatch)
{ {
final RequestPair requestPair = requests.poll(); final RequestPair requestPair = requests.poll();
if (requestPair == null) if (requestPair == null)
{ {
LOG.debug("{} idle", connection); LOG.debug("{} idle", connection);
idleConnections.offer(connection); idleConnections.offer(connection); // TODO: check return value ?
if (!client.isRunning()) if (!client.isRunning())
{ {
LOG.debug("{} is stopping", client); LOG.debug("{} is stopping", client);
@ -262,7 +225,9 @@ public class HttpDestination implements Destination, AutoCloseable
else else
{ {
LOG.debug("{} active", connection); LOG.debug("{} active", connection);
activeConnections.offer(connection); activeConnections.offer(connection); // TODO: check return value ?
if (dispatch)
{
client.getExecutor().execute(new Runnable() client.getExecutor().execute(new Runnable()
{ {
@Override @Override
@ -272,6 +237,11 @@ public class HttpDestination implements Destination, AutoCloseable
} }
}); });
} }
else
{
connection.send(requestPair.request, requestPair.listener);
}
}
} }
public void release(Connection connection) public void release(Connection connection)
@ -280,7 +250,7 @@ public class HttpDestination implements Destination, AutoCloseable
if (client.isRunning()) if (client.isRunning())
{ {
activeConnections.remove(connection); activeConnections.remove(connection);
process(connection); process(connection, false);
} }
else else
{ {
@ -304,7 +274,7 @@ public class HttpDestination implements Destination, AutoCloseable
{ {
connection = acquire(); connection = acquire();
if (connection != null) if (connection != null)
process(connection); process(connection, false);
} }
} }
@ -323,8 +293,8 @@ public class HttpDestination implements Destination, AutoCloseable
RequestPair pair; RequestPair pair;
while ((pair = requests.poll()) != null) while ((pair = requests.poll()) != null)
{ {
notifyRequestFailure(pair.request, failure); requestNotifier.notifyFailure(pair.request, failure);
notifyResponseFailure(pair.listener, failure); responseNotifier.notifyComplete(pair.listener, new Result(pair.request, failure, null));
} }
connectionCount.set(0); connectionCount.set(0);

View File

@ -22,14 +22,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpExchange public class HttpExchange
{ {
private static final int REQUEST_SUCCESS = 1; private static final Logger LOG = Log.getLogger(HttpExchange.class);
private static final int RESPONSE_SUCCESS = 2;
private static final int REQUEST_RESPONSE_SUCCESS = REQUEST_SUCCESS + RESPONSE_SUCCESS;
private final AtomicInteger done = new AtomicInteger(); private final AtomicInteger complete = new AtomicInteger();
private final HttpConversation conversation; private final HttpConversation conversation;
private final HttpConnection connection; private final HttpConnection connection;
private final Request request; private final Request request;
@ -42,7 +42,7 @@ public class HttpExchange
this.connection = connection; this.connection = connection;
this.request = request; this.request = request;
this.listener = listener; this.listener = listener;
this.response = new HttpResponse(request, listener); this.response = new HttpResponse(listener);
} }
public HttpConversation conversation() public HttpConversation conversation()
@ -70,29 +70,51 @@ public class HttpExchange
connection.receive(); connection.receive();
} }
public void requestComplete(boolean success) public boolean requestComplete(boolean success)
{ {
done(success, REQUEST_SUCCESS); int requestSuccess = 0b0011;
int requestFailure = 0b0001;
return complete(success ? requestSuccess : requestFailure);
} }
public void responseComplete(boolean success) public boolean responseComplete(boolean success)
{ {
done(success, RESPONSE_SUCCESS); int responseSuccess = 0b1100;
int responseFailure = 0b0100;
return complete(success ? responseSuccess : responseFailure);
} }
private void done(boolean success, int kind) /**
* This method needs to atomically compute whether this exchange is completed,
* that is both request and responses are completed (either with a success or
* a failure).
*
* Furthermore, this method needs to atomically compute whether the exchange
* has completed successfully (both request and response are successful) or not.
*
* To do this, we use 2 bits for the request (one to indicate completion, one
* to indicate success), and similarly for the response.
* By using {@link AtomicInteger} to atomically sum these codes we can know
* whether the exchange is completed and whether is successful.
*
* @param code the bits representing the status code for either the request or the response
* @return whether the exchange completed (either successfully or not)
*/
private boolean complete(int code)
{ {
if (success) int status = complete.addAndGet(code);
int completed = 0b0101;
if ((status & completed) == completed)
{ {
if (done.addAndGet(kind) == REQUEST_RESPONSE_SUCCESS) LOG.debug("{} complete", this);
{ // Request and response completed
connection.completed(this, true); if (this == conversation.last())
} conversation.complete();
} int success = 0b1111;
else connection.complete(this, status == success);
{ return true;
connection.completed(this, false);
} }
return false;
} }
@Override @Override

View File

@ -23,10 +23,10 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.api.CookieStore; import org.eclipse.jetty.client.api.CookieStore;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpCookie; import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpParser;
@ -41,8 +41,8 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{ {
private static final Logger LOG = Log.getLogger(HttpReceiver.class); private static final Logger LOG = Log.getLogger(HttpReceiver.class);
private final AtomicBoolean complete = new AtomicBoolean();
private final HttpParser parser = new HttpParser(this); private final HttpParser parser = new HttpParser(this);
private final ResponseNotifier notifier = new ResponseNotifier();
private final HttpConnection connection; private final HttpConnection connection;
private volatile boolean failed; private volatile boolean failed;
@ -100,22 +100,34 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{ {
HttpExchange exchange = connection.getExchange(); HttpExchange exchange = connection.getExchange();
HttpConversation conversation = exchange.conversation(); HttpConversation conversation = exchange.conversation();
// Probe the protocol listeners
HttpClient client = connection.getHttpClient();
HttpResponse response = exchange.response(); HttpResponse response = exchange.response();
Response.Listener listener = client.lookup(status);
if (listener == null)
{
listener = conversation.first().listener();
complete.set(true);
}
conversation.listener(listener);
response.version(version).status(status).reason(reason); response.version(version).status(status).reason(reason);
// Probe the protocol handlers
Response.Listener currentListener = exchange.listener();
Response.Listener initialListener = conversation.exchanges().peekFirst().listener();
HttpClient client = connection.getHttpClient();
Response.Listener handlerListener = client.lookup(exchange.request(), response);
if (handlerListener == null)
{
conversation.last(exchange);
if (currentListener == initialListener)
conversation.listener(initialListener);
else
conversation.listener(new MultipleResponseListener(currentListener, initialListener));
}
else
{
if (currentListener == initialListener)
conversation.listener(handlerListener);
else
conversation.listener(new MultipleResponseListener(currentListener, handlerListener));
}
LOG.debug("Receiving {}", response); LOG.debug("Receiving {}", response);
notifyBegin(listener, response); notifier.notifyBegin(conversation.listener(), response);
return false; return false;
} }
@ -153,7 +165,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
HttpConversation conversation = exchange.conversation(); HttpConversation conversation = exchange.conversation();
HttpResponse response = exchange.response(); HttpResponse response = exchange.response();
LOG.debug("Headers {}", response); LOG.debug("Headers {}", response);
notifyHeaders(conversation.listener(), response); notifier.notifyHeaders(conversation.listener(), response);
return false; return false;
} }
@ -164,37 +176,44 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
HttpConversation conversation = exchange.conversation(); HttpConversation conversation = exchange.conversation();
HttpResponse response = exchange.response(); HttpResponse response = exchange.response();
LOG.debug("Content {}: {} bytes", response, buffer.remaining()); LOG.debug("Content {}: {} bytes", response, buffer.remaining());
notifyContent(conversation.listener(), response, buffer); notifier.notifyContent(conversation.listener(), response, buffer);
return false; return false;
} }
@Override @Override
public boolean messageComplete(long contentLength) public boolean messageComplete(long contentLength)
{ {
if (!failed) HttpExchange exchange = connection.getExchange();
// The exchange may be null if it was failed before
if (exchange != null && !failed)
success(); success();
return true; return true;
} }
protected void success() protected void success()
{ {
parser.reset();
HttpExchange exchange = connection.getExchange(); HttpExchange exchange = connection.getExchange();
HttpConversation conversation = exchange.conversation();
HttpResponse response = exchange.response(); HttpResponse response = exchange.response();
LOG.debug("Received {}", response); LOG.debug("Received {}", response);
parser.reset(); boolean exchangeComplete = exchange.responseComplete(true);
failed = false;
boolean complete = this.complete.getAndSet(false);
exchange.responseComplete(true); HttpConversation conversation = exchange.conversation();
notifySuccess(conversation.listener(), response); notifier.notifySuccess(conversation.listener(), response);
if (complete) if (exchangeComplete)
conversation.complete(); {
Result result = new Result(exchange.request(), response);
notifier.notifyComplete(conversation.listener(), result);
}
} }
protected void fail(Throwable failure) protected void fail(Throwable failure)
{ {
parser.close();
failed = true;
HttpExchange exchange = connection.getExchange(); HttpExchange exchange = connection.getExchange();
// In case of a response error, the failure has already been notified // In case of a response error, the failure has already been notified
@ -203,17 +222,18 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
if (exchange == null) if (exchange == null)
return; return;
HttpConversation conversation = exchange.conversation();
HttpResponse response = exchange.response(); HttpResponse response = exchange.response();
LOG.debug("Failed {} {}", response, failure); LOG.debug("Failed {} {}", response, failure);
parser.reset(); boolean exchangeComplete = exchange.responseComplete(false);
failed = true;
complete.set(false);
exchange.responseComplete(false); HttpConversation conversation = exchange.conversation();
notifyFailure(conversation.listener(), response, failure); notifier.notifyFailure(conversation.listener(), response, failure);
conversation.complete(); if (exchangeComplete)
{
Result result = new Result(exchange.request(), response, failure);
notifier.notifyComplete(conversation.listener(), result);
}
} }
@Override @Override
@ -227,77 +247,78 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
public void badMessage(int status, String reason) public void badMessage(int status, String reason)
{ {
HttpExchange exchange = connection.getExchange(); HttpExchange exchange = connection.getExchange();
exchange.response().status(status).reason(reason); HttpResponse response = exchange.response();
fail(new HttpResponseException()); response.status(status).reason(reason);
} fail(new HttpResponseException("HTTP protocol violation: bad response", response));
private void notifyBegin(Response.Listener listener, Response response)
{
try
{
if (listener != null)
listener.onBegin(response);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyHeaders(Response.Listener listener, Response response)
{
try
{
if (listener != null)
listener.onHeaders(response);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyContent(Response.Listener listener, Response response, ByteBuffer buffer)
{
try
{
if (listener != null)
listener.onContent(response, buffer);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifySuccess(Response.Listener listener, Response response)
{
try
{
if (listener != null)
listener.onSuccess(response);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyFailure(Response.Listener listener, Response response, Throwable failure)
{
try
{
if (listener != null)
listener.onFailure(response, failure);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
} }
public void idleTimeout() public void idleTimeout()
{ {
fail(new TimeoutException()); fail(new TimeoutException());
} }
private class MultipleResponseListener implements Response.Listener
{
private final ResponseNotifier notifier = new ResponseNotifier();
private final Response.Listener[] listeners;
private MultipleResponseListener(Response.Listener... listeners)
{
this.listeners = listeners;
}
@Override
public void onBegin(Response response)
{
for (Response.Listener listener : listeners)
{
notifier.notifyBegin(listener, response);
}
}
@Override
public void onHeaders(Response response)
{
for (Response.Listener listener : listeners)
{
notifier.notifyHeaders(listener, response);
}
}
@Override
public void onContent(Response response, ByteBuffer content)
{
for (Response.Listener listener : listeners)
{
notifier.notifyContent(listener, response, content);
}
}
@Override
public void onSuccess(Response response)
{
for (Response.Listener listener : listeners)
{
notifier.notifySuccess(listener, response);
}
}
@Override
public void onFailure(Response response, Throwable failure)
{
for (Response.Listener listener : listeners)
{
notifier.notifyFailure(listener, response, failure);
}
}
@Override
public void onComplete(Result result)
{
for (Response.Listener listener : listeners)
{
notifier.notifyComplete(listener, result);
}
}
}
} }

View File

@ -18,10 +18,12 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URI; import java.net.URI;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.nio.charset.UnsupportedCharsetException; import java.nio.charset.UnsupportedCharsetException;
import java.nio.file.Path;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -30,7 +32,9 @@ import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.PathContentProvider;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpMethod;
@ -150,6 +154,18 @@ public class HttpRequest implements Request
return this; return this;
} }
@Override
public String uri()
{
String scheme = scheme();
String result = scheme + "://" + host();
int port = port();
result += "http".equals(scheme) && port != 80 ? ":" + port : "";
result += "https".equals(scheme) && port != 443 ? ":" + port : "";
result += path();
return result;
}
@Override @Override
public HttpVersion version() public HttpVersion version()
{ {
@ -232,13 +248,21 @@ public class HttpRequest implements Request
} }
@Override @Override
public Request decoder(ContentDecoder decoder) public Request file(Path file) throws IOException
{ {
return this; return file(file, "application/octet-stream");
} }
@Override @Override
public Request cookie(String key, String value) public Request file(Path file, String contentType) throws IOException
{
if (contentType != null)
header(HttpHeader.CONTENT_TYPE.asString(), contentType);
return content(new PathContentProvider(file));
}
@Override
public Request decoder(ContentDecoder decoder)
{ {
return this; return this;
} }
@ -265,25 +289,22 @@ public class HttpRequest implements Request
@Override @Override
public Future<ContentResponse> send() public Future<ContentResponse> send()
{ {
final FutureCallback<ContentResponse> result = new FutureCallback<>(); final FutureCallback<ContentResponse> callback = new FutureCallback<>();
BufferingResponseListener listener = new BufferingResponseListener() BufferingResponseListener listener = new BufferingResponseListener()
{ {
@Override @Override
public void onSuccess(Response response) public void onComplete(Result result)
{ {
super.onSuccess(response); super.onComplete(result);
result.completed(new HttpContentResponse(response, this)); HttpContentResponse contentResponse = new HttpContentResponse(result.getResponse(), content());
} if (!result.isFailed())
callback.completed(contentResponse);
@Override else
public void onFailure(Response response, Throwable failure) callback.failed(contentResponse, result.getFailure());
{
super.onFailure(response, failure);
result.failed(new HttpContentResponse(response, this), failure);
} }
}; };
send(listener); send(listener);
return result; return callback;
} }
@Override @Override

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
@ -26,15 +25,13 @@ import org.eclipse.jetty.http.HttpVersion;
public class HttpResponse implements Response public class HttpResponse implements Response
{ {
private final HttpFields headers = new HttpFields(); private final HttpFields headers = new HttpFields();
private final Request request;
private final Listener listener; private final Listener listener;
private HttpVersion version; private HttpVersion version;
private int status; private int status;
private String reason; private String reason;
public HttpResponse(Request request, Response.Listener listener) public HttpResponse(Response.Listener listener)
{ {
this.request = request;
this.listener = listener; this.listener = listener;
} }
@ -78,12 +75,6 @@ public class HttpResponse implements Response
return headers; return headers;
} }
@Override
public Request request()
{
return request;
}
@Override @Override
public Listener listener() public Listener listener()
{ {

View File

@ -18,9 +18,20 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Response;
public class HttpResponseException extends RuntimeException public class HttpResponseException extends RuntimeException
{ {
public HttpResponseException() private final Response response;
public HttpResponseException(String message, Response response)
{ {
super(message);
this.response = response;
}
public Response getResponse()
{
return response;
} }
} }

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
@ -28,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpGenerator; import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
@ -43,37 +42,31 @@ public class HttpSender
private static final Logger LOG = Log.getLogger(HttpSender.class); private static final Logger LOG = Log.getLogger(HttpSender.class);
private final HttpGenerator generator = new HttpGenerator(); private final HttpGenerator generator = new HttpGenerator();
private final AtomicReference<HttpExchange> exchange = new AtomicReference<>(); private final ResponseNotifier responseNotifier = new ResponseNotifier();
private final HttpConnection connection; private final HttpConnection connection;
private final RequestNotifier requestNotifier;
private long contentLength; private long contentLength;
private Iterator<ByteBuffer> contentChunks; private Iterator<ByteBuffer> contentChunks;
private ByteBuffer header; private ByteBuffer header;
private ByteBuffer chunk; private ByteBuffer chunk;
private boolean headersComplete; private volatile boolean committed;
private boolean failed; private volatile boolean failed;
public HttpSender(HttpConnection connection) public HttpSender(HttpConnection connection)
{ {
this.connection = connection; this.connection = connection;
this.requestNotifier = new RequestNotifier(connection.getHttpClient());
} }
public void send(HttpExchange exchange) public void send(HttpExchange exchange)
{
if (this.exchange.compareAndSet(null, exchange))
{ {
LOG.debug("Sending {}", exchange.request()); LOG.debug("Sending {}", exchange.request());
notifyRequestBegin(exchange.request()); requestNotifier.notifyBegin(exchange.request());
ContentProvider content = exchange.request().content(); ContentProvider content = exchange.request().content();
this.contentLength = content == null ? -1 : content.length(); this.contentLength = content == null ? -1 : content.length();
this.contentChunks = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator(); this.contentChunks = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
send(); send();
} }
else
{
throw new IllegalStateException();
}
}
private void send() private void send()
{ {
@ -81,8 +74,9 @@ public class HttpSender
{ {
HttpClient client = connection.getHttpClient(); HttpClient client = connection.getHttpClient();
EndPoint endPoint = connection.getEndPoint(); EndPoint endPoint = connection.getEndPoint();
HttpExchange exchange = connection.getExchange();
ByteBufferPool byteBufferPool = client.getByteBufferPool(); ByteBufferPool byteBufferPool = client.getByteBufferPool();
final Request request = exchange.get().request(); final Request request = exchange.request();
HttpGenerator.RequestInfo info = null; HttpGenerator.RequestInfo info = null;
ByteBuffer content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER; ByteBuffer content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
boolean lastContent = !contentChunks.hasNext(); boolean lastContent = !contentChunks.hasNext();
@ -133,7 +127,8 @@ public class HttpSender
@Override @Override
protected void pendingCompleted() protected void pendingCompleted()
{ {
notifyRequestHeaders(request); if (!committed)
committed(request);
send(); send();
} }
@ -153,11 +148,9 @@ public class HttpSender
if (callback.completed()) if (callback.completed())
{ {
if (!headersComplete) if (!committed)
{ committed(request);
headersComplete = true;
notifyRequestHeaders(request);
}
releaseBuffers(); releaseBuffers();
content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER; content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER;
lastContent = !contentChunks.hasNext(); lastContent = !contentChunks.hasNext();
@ -186,7 +179,7 @@ public class HttpSender
} }
} }
} }
catch (IOException x) catch (Exception x)
{ {
LOG.debug(x); LOG.debug(x);
fail(x); fail(x);
@ -197,20 +190,35 @@ public class HttpSender
} }
} }
protected void committed(Request request)
{
LOG.debug("Committed {}", request);
committed = true;
requestNotifier.notifyHeaders(request);
}
protected void success() protected void success()
{ {
// Cleanup first // Cleanup first
generator.reset(); generator.reset();
headersComplete = false; committed = false;
// Notify after // Notify after
HttpExchange exchange = this.exchange.getAndSet(null); HttpExchange exchange = connection.getExchange();
LOG.debug("Sent {}", exchange.request()); Request request = exchange.request();
exchange.requestComplete(true); LOG.debug("Sent {}", request);
boolean exchangeCompleted = exchange.requestComplete(true);
// It is important to notify *after* we reset because // It is important to notify *after* we reset because
// the notification may trigger another request/response // the notification may trigger another request/response
notifyRequestSuccess(exchange.request()); requestNotifier.notifySuccess(request);
if (exchangeCompleted)
{
HttpConversation conversation = exchange.conversation();
Result result = new Result(request, exchange.response());
responseNotifier.notifyComplete(conversation.listener(), result);
}
} }
protected void fail(Throwable failure) protected void fail(Throwable failure)
@ -224,12 +232,21 @@ public class HttpSender
failed = true; failed = true;
// Notify after // Notify after
HttpExchange exchange = this.exchange.getAndSet(null); HttpExchange exchange = connection.getExchange();
LOG.debug("Failed {}", exchange.request()); Request request = exchange.request();
exchange.requestComplete(false); LOG.debug("Failed {}", request);
notifyRequestFailure(exchange.request(), failure); boolean exchangeCompleted = exchange.requestComplete(false);
notifyResponseFailure(exchange.listener(), failure); if (!exchangeCompleted && !committed)
exchangeCompleted = exchange.responseComplete(false);
requestNotifier.notifyFailure(request, failure);
if (exchangeCompleted)
{
HttpConversation conversation = exchange.conversation();
Result result = new Result(request, failure, exchange.response());
responseNotifier.notifyComplete(conversation.listener(), result);
}
} }
private void releaseBuffers() private void releaseBuffers()
@ -247,75 +264,6 @@ public class HttpSender
} }
} }
private void notifyRequestBegin(Request request)
{
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onBegin(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyRequestHeaders(Request request)
{
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onHeaders(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyRequestSuccess(Request request)
{
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onSuccess(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyRequestFailure(Request request, Throwable failure)
{
Request.Listener listener = request.listener();
try
{
if (listener != null)
listener.onFailure(request, failure);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private void notifyResponseFailure(Response.Listener listener, Throwable failure)
{
try
{
if (listener != null)
listener.onFailure(null, failure);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
private static abstract class StatefulExecutorCallback implements Callback<Void>, Runnable private static abstract class StatefulExecutorCallback implements Callback<Void>, Runnable
{ {
private final AtomicReference<State> state = new AtomicReference<>(State.INCOMPLETE); private final AtomicReference<State> state = new AtomicReference<>(State.INCOMPLETE);

View File

@ -18,42 +18,12 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.net.URI;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
public class RedirectionProtocolListener extends Response.Listener.Adapter public interface ProtocolHandler
{ {
private final HttpClient client; public boolean accept(Request request, Response response);
public RedirectionProtocolListener(HttpClient client) public Response.Listener getResponseListener();
{
this.client = client;
}
@Override
public void onSuccess(Response response)
{
switch (response.status())
{
case 301: // GET or HEAD only allowed, keep the method
{
break;
}
case 302:
case 303: // use GET for next request
{
String location = response.headers().get("location");
Request redirect = client.newRequest(response.request().id(), URI.create(location));
redirect.send(this);
}
}
}
@Override
public void onFailure(Response response, Throwable failure)
{
// TODO
}
} }

View File

@ -0,0 +1,149 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
public class RedirectProtocolHandler extends Response.Listener.Adapter implements ProtocolHandler
{
private static final String ATTRIBUTE = RedirectProtocolHandler.class.getName() + ".redirect";
private final ResponseNotifier notifier = new ResponseNotifier();
private final HttpClient client;
public RedirectProtocolHandler(HttpClient client)
{
this.client = client;
}
@Override
public boolean accept(Request request, Response response)
{
switch (response.status())
{
case 301:
case 302:
case 303:
case 307:
return true;
}
return false;
}
@Override
public Response.Listener getResponseListener()
{
return this;
}
@Override
public void onComplete(Result result)
{
if (!result.isFailed())
{
Request request = result.getRequest();
Response response = result.getResponse();
String location = response.headers().get("location");
int status = response.status();
switch (status)
{
case 301:
{
if (request.method() == HttpMethod.GET || request.method() == HttpMethod.HEAD)
redirect(result, request.method(), location);
else
fail(result, new HttpResponseException("HTTP protocol violation: received 301 for non GET or HEAD request", response));
break;
}
case 302:
case 303:
{
// Redirect must be done using GET
redirect(result, HttpMethod.GET, location);
break;
}
case 307:
{
// Keep same method
redirect(result, request.method(), location);
break;
}
default:
{
fail(result, new HttpResponseException("Unhandled HTTP status code " + status, response));
break;
}
}
}
else
{
fail(result, result.getFailure());
}
}
private void redirect(Result result, HttpMethod method, String location)
{
Request request = result.getRequest();
HttpConversation conversation = client.getConversation(request);
Integer redirects = (Integer)conversation.getAttribute(ATTRIBUTE);
if (redirects == null)
redirects = 0;
if (redirects < client.getMaxRedirects())
{
++redirects;
conversation.setAttribute(ATTRIBUTE, redirects);
// TODO: no, reuse the same request object, just have a setter for the URI
Request redirect = client.newRequest(request.id(), location);
// Use given method
redirect.method(method);
// Copy headers
for (HttpFields.Field header : request.headers())
redirect.header(header.getName(), header.getValue());
// Copy content
redirect.content(request.content());
redirect.send(new Adapter());
}
else
{
fail(result, new HttpResponseException("Max redirects exceeded " + redirects, result.getResponse()));
}
}
private void fail(Result result, Throwable failure)
{
Request request = result.getRequest();
Response response = result.getResponse();
HttpConversation conversation = client.getConversation(request);
Response.Listener listener = conversation.exchanges().peekFirst().listener();
// TODO: should we reply all event, or just the failure ?
notifier.notifyFailure(listener, response, failure);
notifier.notifyComplete(listener, new Result(request, response, failure));
}
}

View File

@ -0,0 +1,135 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class RequestNotifier
{
private static final Logger LOG = Log.getLogger(ResponseNotifier.class);
private final HttpClient client;
public RequestNotifier(HttpClient client)
{
this.client = client;
}
public void notifyQueued(Request request)
{
notifyQueued(request.listener(), request);
for (Request.Listener listener : client.getRequestListeners())
notifyQueued(listener, request);
}
private void notifyQueued(Request.Listener listener, Request request)
{
try
{
if (listener != null)
listener.onQueued(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifyBegin(Request request)
{
notifyBegin(request.listener(), request);
for (Request.Listener listener : client.getRequestListeners())
notifyBegin(listener, request);
}
private void notifyBegin(Request.Listener listener, Request request)
{
try
{
if (listener != null)
listener.onBegin(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifyHeaders(Request request)
{
notifyHeaders(request.listener(), request);
for (Request.Listener listener : client.getRequestListeners())
notifyHeaders(listener, request);
}
private void notifyHeaders(Request.Listener listener, Request request)
{
try
{
if (listener != null)
listener.onHeaders(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifySuccess(Request request)
{
notifySuccess(request.listener(), request);
for (Request.Listener listener : client.getRequestListeners())
notifySuccess(listener, request);
}
private void notifySuccess(Request.Listener listener, Request request)
{
try
{
if (listener != null)
listener.onSuccess(request);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifyFailure(Request request, Throwable failure)
{
notifyFailure(request.listener(), request, failure);
for (Request.Listener listener : client.getRequestListeners())
notifyFailure(listener, request, failure);
}
private void notifyFailure(Request.Listener listener, Request request, Throwable failure)
{
try
{
if (listener != null)
listener.onFailure(request, failure);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
}

View File

@ -0,0 +1,109 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class ResponseNotifier
{
private static final Logger LOG = Log.getLogger(ResponseNotifier.class);
public void notifyBegin(Response.Listener listener, Response response)
{
try
{
if (listener != null)
listener.onBegin(response);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifyHeaders(Response.Listener listener, Response response)
{
try
{
if (listener != null)
listener.onHeaders(response);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifyContent(Response.Listener listener, Response response, ByteBuffer buffer)
{
try
{
if (listener != null)
listener.onContent(response, buffer);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifySuccess(Response.Listener listener, Response response)
{
try
{
if (listener != null)
listener.onSuccess(response);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifyFailure(Response.Listener listener, Response response, Throwable failure)
{
try
{
if (listener != null)
listener.onFailure(response, failure);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
public void notifyComplete(Response.Listener listener, Result result)
{
try
{
if (listener != null)
listener.onComplete(result);
}
catch (Exception x)
{
LOG.info("Exception while notifying listener " + listener, x);
}
}
}

View File

@ -0,0 +1,48 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.api;
public interface Authentication
{
boolean matches(String type, String uri, String realm);
void authenticate(Request request);
public static class Result
{
private final String uri;
private final Authentication authentication;
public Result(String uri, Authentication authentication)
{
this.uri = uri;
this.authentication = authentication;
}
public String getURI()
{
return uri;
}
public Authentication getAuthentication()
{
return authentication;
}
}
}

View File

@ -0,0 +1,34 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.api;
public interface AuthenticationStore
{
public void addAuthentication(Authentication authentication);
public void removeAuthentication(Authentication authentication);
public Authentication findAuthentication(String type, String uri, String realm);
public void addAuthenticationResult(Authentication.Result result);
public void removeAuthenticationResults();
public Authentication findAuthenticationResult(String uri);
}

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.client.api; package org.eclipse.jetty.client.api;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
@ -45,6 +47,8 @@ public interface Request
Request path(String path); Request path(String path);
String uri();
HttpVersion version(); HttpVersion version();
Request version(HttpVersion version); Request version(HttpVersion version);
@ -61,9 +65,11 @@ public interface Request
Request content(ContentProvider buffer); Request content(ContentProvider buffer);
Request decoder(ContentDecoder decoder); Request file(Path file) throws IOException;
Request cookie(String key, String value); Request file(Path file, String contentType) throws IOException;
Request decoder(ContentDecoder decoder);
String agent(); String agent();

View File

@ -25,8 +25,6 @@ import org.eclipse.jetty.http.HttpVersion;
public interface Response public interface Response
{ {
Request request();
Listener listener(); Listener listener();
HttpVersion version(); HttpVersion version();
@ -51,6 +49,8 @@ public interface Response
public void onFailure(Response response, Throwable failure); public void onFailure(Response response, Throwable failure);
public void onComplete(Result result);
public static class Adapter implements Listener public static class Adapter implements Listener
{ {
@Override @Override
@ -77,6 +77,11 @@ public interface Response
public void onFailure(Response response, Throwable failure) public void onFailure(Response response, Throwable failure)
{ {
} }
@Override
public void onComplete(Result result)
{
}
} }
} }
} }

View File

@ -0,0 +1,70 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.api;
public class Result
{
private final Request request;
private final Throwable requestFailure;
private final Response response;
private final Throwable responseFailure;
public Result(Request request, Response response)
{
this(request, null, response, null);
}
public Result(Request request, Response response, Throwable responseFailure)
{
this(request, null, response, responseFailure);
}
public Result(Request request, Throwable requestFailure, Response response)
{
this(request, requestFailure, response, null);
}
private Result(Request request, Throwable requestFailure, Response response, Throwable responseFailure)
{
this.request = request;
this.requestFailure = requestFailure;
this.response = response;
this.responseFailure = responseFailure;
}
public Request getRequest()
{
return request;
}
public Response getResponse()
{
return response;
}
public boolean isFailed()
{
return getFailure() != null;
}
public Throwable getFailure()
{
return responseFailure != null ? responseFailure : requestFailure;
}
}

View File

@ -0,0 +1,71 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.util;
import java.io.UnsupportedEncodingException;
import java.nio.charset.UnsupportedCharsetException;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
public class BasicAuthentication implements Authentication
{
private final String uri;
private final String realm;
private final String user;
private final String password;
public BasicAuthentication(String uri, String realm, String user, String password)
{
this.uri = uri;
this.realm = realm;
this.user = user;
this.password = password;
}
@Override
public boolean matches(String type, String uri, String realm)
{
if (!"basic".equalsIgnoreCase(type))
return false;
if (!uri.startsWith(this.uri))
return false;
return this.realm.equals(realm);
}
@Override
public void authenticate(Request request)
{
String encoding = StringUtil.__ISO_8859_1;
try
{
String value = "Basic " + B64Code.encode(user + ":" + password, encoding);
request.header(HttpHeader.AUTHORIZATION.asString(), value);
}
catch (UnsupportedEncodingException x)
{
throw new UnsupportedCharsetException(encoding);
}
}
}

View File

@ -19,32 +19,65 @@
package org.eclipse.jetty.client.util; package org.eclipse.jetty.client.util;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException;
import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentProvider;
public class ByteBufferContentProvider implements ContentProvider public class ByteBufferContentProvider implements ContentProvider
{ {
private final ByteBuffer[] buffers; private final ByteBuffer[] buffers;
private final int length;
public ByteBufferContentProvider(ByteBuffer... buffers) public ByteBufferContentProvider(ByteBuffer... buffers)
{ {
this.buffers = buffers; this.buffers = buffers;
int length = 0;
for (ByteBuffer buffer : buffers)
length += buffer.remaining();
this.length = length;
} }
@Override @Override
public long length() public long length()
{ {
int length = 0;
for (ByteBuffer buffer : buffers)
length += buffer.remaining();
return length; return length;
} }
@Override @Override
public Iterator<ByteBuffer> iterator() public Iterator<ByteBuffer> iterator()
{ {
return Arrays.asList(buffers).iterator(); return new Iterator<ByteBuffer>()
{
private int index;
@Override
public boolean hasNext()
{
return index < buffers.length;
}
@Override
public ByteBuffer next()
{
try
{
ByteBuffer buffer = buffers[index];
buffers[index] = buffer.slice();
++index;
return buffer;
}
catch (ArrayIndexOutOfBoundsException x)
{
throw new NoSuchElementException();
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
} }
} }

View File

@ -0,0 +1,82 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.util;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import org.eclipse.jetty.client.api.ContentProvider;
public class InputStreamContentProvider implements ContentProvider
{
private final InputStream input;
private final long length;
private final int capacity;
public InputStreamContentProvider(InputStream input)
{
this(input, -1);
}
public InputStreamContentProvider(InputStream input, long length)
{
this(input, length, length <= 0 ? 4096 : (int)Math.min(4096, length));
}
public InputStreamContentProvider(InputStream input, long length, int capacity)
{
this.input = input;
this.length = length;
this.capacity = capacity;
}
@Override
public long length()
{
return length;
}
@Override
public Iterator<ByteBuffer> iterator()
{
return null; // TODO
}
private class LazyIterator implements Iterator<ByteBuffer>
{
@Override
public boolean hasNext()
{
return false;
}
@Override
public ByteBuffer next()
{
return null;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -22,10 +22,27 @@ import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.SelectChannelConnector; import org.eclipse.jetty.server.SelectChannelConnector;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Rule;
import org.junit.rules.TestWatchman;
import org.junit.runners.model.FrameworkMethod;
public class AbstractHttpClientServerTest public class AbstractHttpClientServerTest
{ {
@Rule
public final TestWatchman testName = new TestWatchman()
{
@Override
public void starting(FrameworkMethod method)
{
super.starting(method);
System.err.printf("Running %s.%s()%n",
method.getMethod().getDeclaringClass().getName(),
method.getName());
}
};
protected Server server; protected Server server;
protected HttpClient client; protected HttpClient client;
protected NetworkConnector connector; protected NetworkConnector connector;
@ -38,7 +55,9 @@ public class AbstractHttpClientServerTest
server.setHandler(handler); server.setHandler(handler);
server.start(); server.start();
client = new HttpClient(); QueuedThreadPool executor = new QueuedThreadPool();
executor.setName(executor.getName() + "-client");
client = new HttpClient(executor);
client.start(); client.start();
} }

View File

@ -0,0 +1,169 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.BasicAuthentication;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.IO;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
{
@Test
public void test_BasicAuthentication_WithChallenge() throws Exception
{
start(new BasicAuthenticationHandler());
AuthenticationStore authenticationStore = client.getAuthenticationStore();
String realm = "test";
final AtomicInteger requests = new AtomicInteger();
Request.Listener.Adapter requestListener = new Request.Listener.Adapter()
{
@Override
public void onSuccess(Request request)
{
requests.incrementAndGet();
}
};
client.getRequestListeners().add(requestListener);
// Request without Authentication causes a 401
Request request = client.newRequest("localhost", connector.getLocalPort())
.path("/test")
.param("type", "Basic")
.param("realm", realm);
ContentResponse response = request.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(401, response.status());
Assert.assertEquals(1, requests.get());
client.getRequestListeners().remove(requestListener);
requests.set(0);
String user = "jetty";
String password = "rocks";
authenticationStore.addAuthentication(new BasicAuthentication("http://localhost:" + connector.getLocalPort(), realm, user, password));
requestListener = new Request.Listener.Adapter()
{
@Override
public void onSuccess(Request request)
{
requests.incrementAndGet();
}
};
client.getRequestListeners().add(requestListener);
// Request with authentication causes a 401 (no previous successful authentication) + 200
request.param("user", user).param("password", password);
response = request.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertEquals(2, requests.get());
client.getRequestListeners().remove(requestListener);
requests.set(0);
requestListener = new Request.Listener.Adapter()
{
@Override
public void onSuccess(Request request)
{
requests.incrementAndGet();
}
};
client.getRequestListeners().add(requestListener);
// Further requests do not trigger 401 because there is a previous successful authentication
// Remove existing header to be sure it's added by the implementation
request.header(HttpHeader.AUTHORIZATION.asString(), null);
response = request.send().get(555, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertEquals(1, requests.get());
client.getRequestListeners().remove(requestListener);
requests.set(0);
}
private class BasicAuthenticationHandler extends AbstractHandler
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
String type = request.getParameter("type");
String authorization = request.getHeader(HttpHeader.AUTHORIZATION.asString());
if (authorization == null)
{
String realm = request.getParameter("realm");
response.setStatus(401);
switch (type)
{
case "Basic":
{
response.setHeader("WWW-Authenticate", "Basic realm=\"" + realm + "\"");
break;
}
default:
{
throw new IllegalStateException();
}
}
}
else
{
switch (type)
{
case "Basic":
{
String user = request.getParameter("user");
String password = request.getParameter("password");
String expected = "Basic " + B64Code.encode(user + ":" + password);
if (!expected.equals(authorization))
throw new IOException(expected + " != " + authorization);
IO.copy(request.getInputStream(), response.getOutputStream());
break;
}
default:
{
throw new IllegalStateException();
}
}
}
}
finally
{
baseRequest.setHandled(true);
}
}
}
}

View File

@ -0,0 +1,196 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.IO;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.fail;
public class HttpClientRedirectTest extends AbstractHttpClientServerTest
{
@Before
public void init() throws Exception
{
start(new RedirectHandler());
}
@Test
public void test_303() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/303/localhost/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
@Test
public void test_303_302() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/303/localhost/302/localhost/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
@Test
public void test_303_302_OnDifferentDestinations() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/303/127.0.0.1/302/localhost/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
@Test
public void test_301() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.HEAD)
.path("/301/localhost/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
@Test
public void test_301_WithWrongMethod() throws Exception
{
try
{
client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.path("/301/localhost/done")
.send().get(5, TimeUnit.SECONDS);
fail();
}
catch (ExecutionException x)
{
HttpResponseException xx = (HttpResponseException)x.getCause();
Response response = xx.getResponse();
Assert.assertNotNull(response);
Assert.assertEquals(301, response.status());
Assert.assertTrue(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
}
@Test
public void test_307_WithRequestContent() throws Exception
{
byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.path("/307/localhost/done")
.content(new ByteBufferContentProvider(ByteBuffer.wrap(data)))
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
Assert.assertArrayEquals(data, response.content());
}
@Test
public void testMaxRedirections() throws Exception
{
client.setMaxRedirects(1);
try
{
client.newRequest("localhost", connector.getLocalPort())
.path("/303/localhost/302/localhost/done")
.send().get(5, TimeUnit.SECONDS);
fail();
}
catch (ExecutionException x)
{
HttpResponseException xx = (HttpResponseException)x.getCause();
Response response = xx.getResponse();
Assert.assertNotNull(response);
Assert.assertEquals(302, response.status());
Assert.assertTrue(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
}
@Test
public void test_303_WithConnectionClose_WithBigRequest() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/303/localhost/done?close=true")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
private class RedirectHandler extends AbstractHandler
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
String[] paths = target.split("/", 4);
int status = Integer.parseInt(paths[1]);
response.setStatus(status);
String host = paths[2];
response.setHeader("Location", request.getScheme() + "://" + host + ":" + request.getServerPort() + "/" + paths[3]);
String close = request.getParameter("close");
if (Boolean.parseBoolean(close))
response.setHeader("Connection", "close");
}
catch (NumberFormatException x)
{
response.setStatus(200);
// Echo content back
IO.copy(request.getInputStream(), response.getOutputStream());
}
finally
{
baseRequest.setHandled(true);
}
}
}
}

View File

@ -0,0 +1,76 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.junit.Assert;
import org.junit.Test;
import static java.nio.file.StandardOpenOption.CREATE;
public class HttpClientStreamTest extends AbstractHttpClientServerTest
{
@Test
public void testFileUpload() throws Exception
{
// Prepare a big file to upload
Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();
Files.createDirectories(targetTestsDir);
Path upload = Paths.get(targetTestsDir.toString(), "http_client_upload.big");
try (OutputStream output = Files.newOutputStream(upload, CREATE))
{
byte[] kb = new byte[1024];
for (int i = 0; i < 10 * 1024; ++i)
output.write(kb);
}
start(new EmptyHandler());
final AtomicLong requestTime = new AtomicLong();
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.file(upload)
.listener(new Request.Listener.Adapter()
{
@Override
public void onSuccess(Request request)
{
requestTime.set(System.nanoTime());
}
})
.send()
.get(5, TimeUnit.SECONDS);
long responseTime = System.nanoTime();
Assert.assertEquals(200, response.status());
Assert.assertTrue(requestTime.get() <= responseTime);
// Give some time to the server to consume the request content
// This is just to avoid exception traces in the test output
Thread.sleep(1000);
}
}

View File

@ -19,25 +19,39 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpCookie; import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.IO;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static java.nio.file.StandardOpenOption.CREATE;
public class HttpClientTest extends AbstractHttpClientServerTest public class HttpClientTest extends AbstractHttpClientServerTest
{ {
@Test @Test
@ -307,4 +321,138 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS)); Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
} }
@Slow
@Test
public void test_ExchangeIsComplete_OnlyWhenBothRequestAndResponseAreComplete() throws Exception
{
start(new EmptyHandler());
// Prepare a big file to upload
Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();
Files.createDirectories(targetTestsDir);
Path file = Paths.get(targetTestsDir.toString(), "http_client_conversation.big");
try (OutputStream output = Files.newOutputStream(file, CREATE))
{
byte[] kb = new byte[1024];
for (int i = 0; i < 10 * 1024; ++i)
output.write(kb);
}
final CountDownLatch latch = new CountDownLatch(3);
final AtomicLong exchangeTime = new AtomicLong();
final AtomicLong requestTime = new AtomicLong();
final AtomicLong responseTime = new AtomicLong();
client.newRequest("localhost", connector.getLocalPort())
.file(file)
.listener(new org.eclipse.jetty.client.api.Request.Listener.Adapter()
{
@Override
public void onSuccess(org.eclipse.jetty.client.api.Request request)
{
requestTime.set(System.nanoTime());
latch.countDown();
}
})
.send(new Response.Listener.Adapter()
{
@Override
public void onSuccess(Response response)
{
responseTime.set(System.nanoTime());
latch.countDown();
}
@Override
public void onComplete(Result result)
{
exchangeTime.set(System.nanoTime());
latch.countDown();
}
});
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
Assert.assertTrue(requestTime.get() <= exchangeTime.get());
Assert.assertTrue(responseTime.get() <= exchangeTime.get());
// Give some time to the server to consume the request content
// This is just to avoid exception traces in the test output
Thread.sleep(1000);
Files.delete(file);
}
@Test
public void test_ExchangeIsComplete_WhenRequestFailsMidway_WithResponse() throws Exception
{
final int chunkSize = 16;
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
// Echo back
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
// The second ByteBuffer set to null will throw an exception
.content(new ContentProvider()
{
@Override
public long length()
{
return -1;
}
@Override
public Iterator<ByteBuffer> iterator()
{
return Arrays.asList(ByteBuffer.allocate(chunkSize), null).iterator();
}
})
.send(new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_ExchangeIsComplete_WhenRequestFails_WithNoResponse() throws Exception
{
start(new EmptyHandler());
final CountDownLatch latch = new CountDownLatch(1);
final String host = "localhost";
final int port = connector.getLocalPort();
client.newRequest(host, port)
.listener(new org.eclipse.jetty.client.api.Request.Listener.Adapter()
{
@Override
public void onBegin(org.eclipse.jetty.client.api.Request request)
{
HttpDestination destination = (HttpDestination)client.getDestination("http", host, port);
destination.getActiveConnections().peek().close();
}
})
.send(new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
} }

View File

@ -18,14 +18,22 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -48,7 +56,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
Assert.assertEquals(0, activeConnections.size()); Assert.assertEquals(0, activeConnections.size());
final CountDownLatch headersLatch = new CountDownLatch(1); final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(2); final CountDownLatch successLatch = new CountDownLatch(3);
client.newRequest(host, port) client.newRequest(host, port)
.listener(new Request.Listener.Adapter() .listener(new Request.Listener.Adapter()
{ {
@ -73,6 +81,13 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
{ {
successLatch.countDown(); successLatch.countDown();
} }
@Override
public void onComplete(Result result)
{
Assert.assertFalse(result.isFailed());
successLatch.countDown();
}
}); });
Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS));
@ -117,8 +132,9 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
}).send(new Response.Listener.Adapter() }).send(new Response.Listener.Adapter()
{ {
@Override @Override
public void onFailure(Response response, Throwable failure) public void onComplete(Result result)
{ {
Assert.assertTrue(result.isFailed());
Assert.assertEquals(0, idleConnections.size()); Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(0, activeConnections.size()); Assert.assertEquals(0, activeConnections.size());
failureLatch.countDown(); failureLatch.countDown();
@ -133,7 +149,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
} }
@Test @Test
public void test_BadRequest_ReturnsConnection() throws Exception public void test_BadRequest_RemovesConnection() throws Exception
{ {
start(new EmptyHandler()); start(new EmptyHandler());
@ -148,7 +164,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections(); final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
Assert.assertEquals(0, activeConnections.size()); Assert.assertEquals(0, activeConnections.size());
final CountDownLatch successLatch = new CountDownLatch(2); final CountDownLatch successLatch = new CountDownLatch(3);
client.newRequest(host, port) client.newRequest(host, port)
.listener(new Request.Listener.Adapter() .listener(new Request.Listener.Adapter()
{ {
@ -170,13 +186,23 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
@Override @Override
public void onSuccess(Response response) public void onSuccess(Response response)
{ {
Assert.assertEquals(400, response.status());
// 400 response also come with a Connection: close,
// so the connection is closed and removed
successLatch.countDown();
}
@Override
public void onComplete(Result result)
{
Assert.assertFalse(result.isFailed());
successLatch.countDown(); successLatch.countDown();
} }
}); });
Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(1, idleConnections.size()); Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(0, activeConnections.size()); Assert.assertEquals(0, activeConnections.size());
} }
@ -211,8 +237,9 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
.send(new Response.Listener.Adapter() .send(new Response.Listener.Adapter()
{ {
@Override @Override
public void onFailure(Response response, Throwable failure) public void onComplete(Result result)
{ {
Assert.assertTrue(result.isFailed());
failureLatch.countDown(); failureLatch.countDown();
} }
}); });
@ -222,4 +249,92 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
Assert.assertEquals(0, idleConnections.size()); Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(0, activeConnections.size()); Assert.assertEquals(0, activeConnections.size());
} }
@Test
public void test_ResponseWithConnectionCloseHeader_RemovesConnection() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
response.setHeader("Connection", "close");
baseRequest.setHandled(true);
}
});
String scheme = "http";
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest(host, port)
.send(new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
Assert.assertFalse(result.isFailed());
Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
}
@Test
public void test_BigRequestContent_ResponseWithConnectionCloseHeader_RemovesConnection() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
response.setHeader("Connection", "close");
baseRequest.setHandled(true);
}
});
String scheme = "http";
String host = "localhost";
int port = connector.getLocalPort();
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
final BlockingQueue<Connection> idleConnections = destination.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final BlockingQueue<Connection> activeConnections = destination.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest(host, port)
.content(new ByteBufferContentProvider(ByteBuffer.allocate(16 * 1024 * 1024)))
.send(new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
}
} }

View File

@ -76,12 +76,12 @@ public class HttpDestinationTest extends AbstractHttpClientServerTest
HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort()) HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort())
{ {
@Override @Override
protected void process(Connection connection) protected void process(Connection connection, boolean dispatch)
{ {
try try
{ {
latch.await(5, TimeUnit.SECONDS); latch.await(5, TimeUnit.SECONDS);
super.process(connection); super.process(connection, dispatch);
} }
catch (InterruptedException x) catch (InterruptedException x)
{ {

View File

@ -64,7 +64,7 @@ public class HttpReceiverTest
protected HttpExchange newExchange(Response.Listener listener) protected HttpExchange newExchange(Response.Listener listener)
{ {
HttpExchange exchange = new HttpExchange(conversation, connection, null, listener); HttpExchange exchange = new HttpExchange(conversation, connection, null, listener);
conversation.add(exchange); conversation.exchanges().offer(exchange);
connection.setExchange(exchange); connection.setExchange(exchange);
return exchange; return exchange;
} }

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.ByteBufferContentProvider; import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.io.ByteArrayEndPoint; import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.toolchain.test.annotation.Slow;
@ -131,8 +132,9 @@ public class HttpSenderTest
connection.send(request, new Response.Listener.Adapter() connection.send(request, new Response.Listener.Adapter()
{ {
@Override @Override
public void onFailure(Response response, Throwable failure) public void onComplete(Result result)
{ {
Assert.assertTrue(result.isFailed());
failureLatch.countDown(); failureLatch.countDown();
} }
}); });
@ -159,8 +161,9 @@ public class HttpSenderTest
connection.send(request, new Response.Listener.Adapter() connection.send(request, new Response.Listener.Adapter()
{ {
@Override @Override
public void onFailure(Response response, Throwable failure) public void onComplete(Result result)
{ {
Assert.assertTrue(result.isFailed());
failureLatch.countDown(); failureLatch.countDown();
} }
}); });

View File

@ -1,99 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class RedirectionTest extends AbstractHttpClientServerTest
{
@Before
public void init() throws Exception
{
start(new RedirectHandler());
}
@Test
public void test_303() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/localhost/303/localhost/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
@Test
public void test_303_302() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/localhost/303/localhost/302/localhost/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
@Test
public void test_303_302_OnDifferentDestinations() throws Exception
{
Response response = client.newRequest("localhost", connector.getLocalPort())
.path("/127.0.0.1/303/localhost/302/localhost/done")
.send().get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.status());
Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString()));
}
private class RedirectHandler extends AbstractHandler
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
String[] paths = target.split("/", 4);
String host = paths[1];
int status = Integer.parseInt(paths[2]);
response.setStatus(status);
response.setHeader("Location", request.getScheme() + "://" + host + ":" + request.getServerPort() + "/" + paths[3]);
}
catch (NumberFormatException x)
{
response.setStatus(200);
}
finally
{
baseRequest.setHandled(true);
}
}
}
}

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.PathContentProvider; import org.eclipse.jetty.client.util.PathContentProvider;
import org.eclipse.jetty.client.util.StreamingResponseListener; import org.eclipse.jetty.client.util.StreamingResponseListener;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.junit.Assert; import org.junit.Assert;
@ -67,7 +68,6 @@ public class Usage
.param("a", "b") .param("a", "b")
.header("X-Header", "Y-value") .header("X-Header", "Y-value")
.agent("Jetty HTTP Client") .agent("Jetty HTTP Client")
.cookie("cookie1", "value1")
.decoder(null) .decoder(null)
.content(null) .content(null)
.idleTimeout(5000L); .idleTimeout(5000L);
@ -140,7 +140,8 @@ public class Usage
public void testCookie() throws Exception public void testCookie() throws Exception
{ {
HttpClient client = new HttpClient(); HttpClient client = new HttpClient();
Response response = client.newRequest("localhost", 8080).cookie("key", "value").send().get(); client.getCookieStore().addCookie(client.getDestination("http", "host", 8080), new HttpCookie("name", "value"));
Response response = client.newRequest("host", 8080).send().get();
Assert.assertEquals(200, response.status()); Assert.assertEquals(200, response.status());
} }

View File

@ -159,6 +159,12 @@ public abstract class AbstractEndPoint implements EndPoint
_writeFlusher.write(context, callback, buffers); _writeFlusher.write(context, callback, buffers);
} }
@Override
public boolean isBufferingOutput()
{
return false;
}
protected abstract void onIncompleteFlush(); protected abstract void onIncompleteFlush();
protected abstract boolean needsFill() throws IOException; protected abstract boolean needsFill() throws IOException;
@ -221,7 +227,6 @@ public abstract class AbstractEndPoint implements EndPoint
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x{%s<r-l>%s,o=%b,is=%b,os=%b,fi=%s,wf=%s}{%s}", return String.format("%s@%x{%s<r-l>%s,o=%b,is=%b,os=%b,fi=%s,wf=%s}{%s}",
getClass().getSimpleName(), getClass().getSimpleName(),
hashCode(), hashCode(),

View File

@ -241,4 +241,10 @@ public interface EndPoint extends Closeable
*/ */
void onClose(); void onClose();
/**
* @return True if the endpoint is buffering output.
*/
boolean isBufferingOutput();
} }

View File

@ -313,6 +313,17 @@ abstract public class WriteFlusher
} }
} }
// Handle buffering endpoint
if (_endPoint.isBufferingOutput())
{
PendingState<?> pending=new PendingState<>(buffers, context, callback);
if (updateState(__WRITING,pending))
onIncompleteFlushed();
else
fail(new PendingState<>(buffers, context, callback));
return;
}
// If updateState didn't succeed, we don't care as our buffers have been written // If updateState didn't succeed, we don't care as our buffers have been written
if (!updateState(__WRITING,__IDLE)) if (!updateState(__WRITING,__IDLE))
ignoreFail(); ignoreFail();
@ -372,6 +383,16 @@ abstract public class WriteFlusher
} }
} }
// Handle buffering endpoint
if (_endPoint.isBufferingOutput())
{
if (updateState(__COMPLETING,pending))
onIncompleteFlushed();
else
fail(pending);
return;
}
// If updateState didn't succeed, we don't care as our buffers have been written // If updateState didn't succeed, we don't care as our buffers have been written
if (!updateState(__COMPLETING,__IDLE)) if (!updateState(__COMPLETING,__IDLE))
ignoreFail(); ignoreFail();

View File

@ -244,8 +244,6 @@ public class SslConnection extends AbstractConnection
private boolean _cannotAcceptMoreAppDataToFlush; private boolean _cannotAcceptMoreAppDataToFlush;
private boolean _underFlown; private boolean _underFlown;
// TODO: use ExecutorCallback ?
// private final Callback<Void> _writeCallback = new ExecutorCallback<Void>(getExecutor())
private final Callback<Void> _writeCallback = new Callback<Void>() private final Callback<Void> _writeCallback = new Callback<Void>()
{ {
@Override @Override
@ -426,6 +424,12 @@ public class SslConnection extends AbstractConnection
super.setConnection(connection); super.setConnection(connection);
} }
@Override
public boolean isBufferingOutput()
{
return BufferUtil.hasContent(_encryptedOutput);
}
public SslConnection getSslConnection() public SslConnection getSslConnection()
{ {
return SslConnection.this; return SslConnection.this;

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
@ -36,6 +39,8 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.TimerScheduler; import org.eclipse.jetty.util.thread.TimerScheduler;
@ -44,14 +49,9 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SelectChannelEndPointTest public class SelectChannelEndPointTest
{ {
private static final Logger LOG = Log.getLogger(SelectChannelEndPointTest.class);
protected CountDownLatch _lastEndPointLatch; protected CountDownLatch _lastEndPointLatch;
protected volatile EndPoint _lastEndPoint; protected volatile EndPoint _lastEndPoint;
protected ServerSocketChannel _connector; protected ServerSocketChannel _connector;
@ -179,6 +179,7 @@ public class SelectChannelEndPointTest
_endp.write(null, blockingWrite, out.asReadOnlyBuffer()); _endp.write(null, blockingWrite, out.asReadOnlyBuffer());
blockingWrite.get(); blockingWrite.get();
} }
LOG.info("Finished writing {}", _writeCount);
progress = true; progress = true;
} }
@ -602,6 +603,9 @@ public class SelectChannelEndPointTest
client.getOutputStream().write(data.getBytes("UTF-8")); client.getOutputStream().write(data.getBytes("UTF-8"));
BufferedInputStream in = new BufferedInputStream(client.getInputStream()); BufferedInputStream in = new BufferedInputStream(client.getInputStream());
int byteNum = 0;
try
{
for (int i = 0; i < _writeCount; i++) for (int i = 0; i < _writeCount; i++)
{ {
if (i % 1000 == 0) if (i % 1000 == 0)
@ -612,6 +616,7 @@ public class SelectChannelEndPointTest
{ {
char c = data.charAt(j); char c = data.charAt(j);
int b = in.read(); int b = in.read();
byteNum++;
assertTrue(b > 0); assertTrue(b > 0);
assertEquals("test-" + i + "/" + j,c,(char)b); assertEquals("test-" + i + "/" + j,c,(char)b);
} }
@ -619,6 +624,14 @@ public class SelectChannelEndPointTest
if (i == 0) if (i == 0)
_lastEndPoint.setIdleTimeout(60000); _lastEndPoint.setIdleTimeout(60000);
} }
}
catch (SocketTimeoutException e)
{
System.err.println("SelectorManager.dump() = " + _manager.dump());
LOG.warn("Server: " + server);
LOG.warn("Error reading byte #" + byteNum,e);
throw e;
}
client.close(); client.close();

View File

@ -1,2 +1,6 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# Enable the next two to debug SSL write blocked issue
org.eclipse.jetty.io.LEVEL=INFO org.eclipse.jetty.io.LEVEL=INFO
org.eclipse.jetty.io.SelectorManager.LEVEL=DEBUG
org.eclipse.jetty.io.SelectChannelEndPointTest.LEVEL=DEBUG

View File

@ -60,6 +60,12 @@ public class EmptyEndPoint implements EndPoint
oshut = true; oshut = true;
} }
@Override
public boolean isBufferingOutput()
{
return false;
}
@Override @Override
public boolean isOutputShutdown() public boolean isOutputShutdown()
{ {