mirror of https://github.com/apache/jclouds.git
issue 69: refactored http clients to not be bound to a single endpoint such that redirects can be assigned to another host
git-svn-id: http://jclouds.googlecode.com/svn/trunk@1456 3d8758e0-26b5-11de-8745-db77d3ebf521
This commit is contained in:
parent
39e96d041f
commit
48c3155450
|
@ -33,7 +33,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class FutureCommand<Q, R, T> implements Future<T> {
|
||||
public class FutureCommand<E, Q extends Request<E>, R, T> implements Future<T> {
|
||||
|
||||
private final Q request;
|
||||
private final ResponseRunnableFuture<R, T> responseRunnableFuture;
|
||||
|
|
|
@ -28,6 +28,6 @@ package org.jclouds.command;
|
|||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public interface FutureCommandClient<O extends FutureCommand<?, ?, ?>> {
|
||||
void submit(O operation);
|
||||
public interface FutureCommandClient<O extends FutureCommand<?, ?, ?, ?>> {
|
||||
void submit(O operation);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2009 Global Cloud Specialists, Inc. <info@globalcloudspecialists.com>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
package org.jclouds.command;
|
||||
|
||||
/**
|
||||
* A service request must have an endPoint associated with it.
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public interface Request<E> {
|
||||
E getEndPoint();
|
||||
|
||||
void setEndPoint(E endPoint);
|
||||
}
|
|
@ -32,76 +32,78 @@ import javax.annotation.Resource;
|
|||
import org.jclouds.command.FutureCommand;
|
||||
import org.jclouds.logging.Logger;
|
||||
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
/**
|
||||
* // TODO: Adrian: Document this!
|
||||
* Associates a command with an open connection to a service.
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public abstract class FutureCommandConnectionHandle<C, O extends FutureCommand<?, ?, ?>> {
|
||||
protected final BlockingQueue<C> available;
|
||||
protected final Semaphore maxConnections;
|
||||
protected final Semaphore completed;
|
||||
protected C conn;
|
||||
protected O command;
|
||||
@Resource
|
||||
protected Logger logger = Logger.NULL;
|
||||
public abstract class FutureCommandConnectionHandle<E, C, O extends FutureCommand<E, ?, ?, ?>> {
|
||||
protected final BlockingQueue<C> available;
|
||||
protected final Semaphore maxConnections;
|
||||
protected final Semaphore completed;
|
||||
protected final E endPoint;
|
||||
protected C conn;
|
||||
protected O command;
|
||||
@Resource
|
||||
protected Logger logger = Logger.NULL;
|
||||
|
||||
public FutureCommandConnectionHandle(Semaphore maxConnections,
|
||||
@Assisted O command, @Assisted C conn, BlockingQueue<C> available)
|
||||
throws InterruptedException {
|
||||
this.maxConnections = maxConnections;
|
||||
this.command = command;
|
||||
this.conn = conn;
|
||||
this.available = available;
|
||||
this.completed = new Semaphore(1);
|
||||
completed.acquire();
|
||||
}
|
||||
public FutureCommandConnectionHandle(Semaphore maxConnections, BlockingQueue<C> available,
|
||||
E endPoint, O command, C conn) throws InterruptedException {
|
||||
this.available = available;
|
||||
this.maxConnections = maxConnections;
|
||||
this.completed = new Semaphore(1);
|
||||
this.endPoint = endPoint;
|
||||
this.command = command;
|
||||
this.conn = conn;
|
||||
completed.acquire();
|
||||
}
|
||||
|
||||
public O getCommand() {
|
||||
return command;
|
||||
}
|
||||
public O getCommand() {
|
||||
return command;
|
||||
}
|
||||
|
||||
public abstract void startConnection();
|
||||
public abstract void startConnection();
|
||||
|
||||
public boolean isCompleted() {
|
||||
return (completed.availablePermits() == 1);
|
||||
}
|
||||
public boolean isCompleted() {
|
||||
return (completed.availablePermits() == 1);
|
||||
}
|
||||
|
||||
public void release() throws InterruptedException {
|
||||
if (isCompleted()) {
|
||||
return;
|
||||
}
|
||||
logger.trace("%1$s - %2$d - releasing to pool", conn, conn.hashCode());
|
||||
available.put(conn);
|
||||
conn = null;
|
||||
command = null;
|
||||
completed.release();
|
||||
}
|
||||
public void release() throws InterruptedException {
|
||||
if (isCompleted() || alreadyReleased()) {
|
||||
return;
|
||||
}
|
||||
logger.trace("%1$s - %2$d - releasing to pool", conn, conn.hashCode());
|
||||
available.put(conn);
|
||||
conn = null;
|
||||
command = null;
|
||||
completed.release();
|
||||
}
|
||||
|
||||
public void cancel() throws IOException {
|
||||
if (isCompleted()) {
|
||||
return;
|
||||
}
|
||||
if (conn != null) {
|
||||
logger.trace("%1$s - %2$d - cancelled; shutting down connection",
|
||||
conn, conn.hashCode());
|
||||
try {
|
||||
shutdownConnection();
|
||||
} finally {
|
||||
conn = null;
|
||||
command = null;
|
||||
maxConnections.release();
|
||||
}
|
||||
}
|
||||
completed.release();
|
||||
}
|
||||
private boolean alreadyReleased() {
|
||||
return conn == null;
|
||||
}
|
||||
|
||||
public abstract void shutdownConnection() throws IOException;
|
||||
public void cancel() throws IOException {
|
||||
if (isCompleted()) {
|
||||
return;
|
||||
}
|
||||
if (conn != null) {
|
||||
logger.trace("%1$s - %2$d - cancelled; shutting down connection", conn, conn.hashCode());
|
||||
try {
|
||||
shutdownConnection();
|
||||
} finally {
|
||||
conn = null;
|
||||
command = null;
|
||||
maxConnections.release();
|
||||
}
|
||||
}
|
||||
completed.release();
|
||||
}
|
||||
|
||||
public void waitFor() throws InterruptedException {
|
||||
completed.acquire();
|
||||
completed.release();
|
||||
}
|
||||
public abstract void shutdownConnection() throws IOException;
|
||||
|
||||
public void waitFor() throws InterruptedException {
|
||||
completed.acquire();
|
||||
completed.release();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import org.jclouds.command.FutureCommand;
|
||||
import org.jclouds.lifecycle.BaseLifeCycle;
|
||||
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.google.inject.name.Named;
|
||||
|
||||
/**
|
||||
|
@ -41,130 +41,156 @@ import com.google.inject.name.Named;
|
|||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public abstract class FutureCommandConnectionPool<C, O extends FutureCommand<?, ?, ?>>
|
||||
extends BaseLifeCycle {
|
||||
protected final Semaphore allConnections;
|
||||
protected final BlockingQueue<C> available;
|
||||
protected final BlockingQueue<O> commandQueue;
|
||||
protected final FutureCommandConnectionHandleFactory<C, O> futureCommandConnectionHandleFactory;
|
||||
protected final int maxConnectionReuse;
|
||||
protected final AtomicInteger currentSessionFailures = new AtomicInteger(0);
|
||||
protected volatile boolean hitBottom = false;
|
||||
public abstract class FutureCommandConnectionPool<E, C, O extends FutureCommand<E, ?, ?, ?>>
|
||||
extends BaseLifeCycle {
|
||||
|
||||
public FutureCommandConnectionPool(
|
||||
ExecutorService executor,
|
||||
Semaphore allConnections,
|
||||
BlockingQueue<O> commandQueue,
|
||||
FutureCommandConnectionHandleFactory<C, O> futureCommandConnectionHandleFactory,
|
||||
@Named("maxConnectionReuse") int maxConnectionReuse,
|
||||
BlockingQueue<C> available, BaseLifeCycle... dependencies) {
|
||||
super(executor, dependencies);
|
||||
this.allConnections = allConnections;
|
||||
this.commandQueue = commandQueue;
|
||||
this.futureCommandConnectionHandleFactory = futureCommandConnectionHandleFactory;
|
||||
this.maxConnectionReuse = maxConnectionReuse;
|
||||
this.available = available;
|
||||
}
|
||||
protected final Semaphore allConnections;
|
||||
protected final BlockingQueue<C> available;
|
||||
|
||||
protected void setResponseException(Exception ex, C conn) {
|
||||
O command = getHandleFromConnection(conn).getCommand();
|
||||
command.getResponseFuture().setException(ex);
|
||||
}
|
||||
/**
|
||||
* inputOnly: nothing is taken from this queue.
|
||||
*/
|
||||
protected final BlockingQueue<O> resubmitQueue;
|
||||
protected final int maxConnectionReuse;
|
||||
protected final AtomicInteger currentSessionFailures = new AtomicInteger(0);
|
||||
protected volatile boolean hitBottom = false;
|
||||
protected final E endPoint;
|
||||
|
||||
protected void cancel(C conn) {
|
||||
O command = getHandleFromConnection(conn).getCommand();
|
||||
command.cancel(true);
|
||||
}
|
||||
public E getEndPoint() {
|
||||
return endPoint;
|
||||
}
|
||||
|
||||
@Provides
|
||||
public C getConnection() throws InterruptedException, TimeoutException {
|
||||
exceptionIfNotActive();
|
||||
if (!hitBottom) {
|
||||
hitBottom = available.size() == 0
|
||||
&& allConnections.availablePermits() == 0;
|
||||
if (hitBottom)
|
||||
logger.warn("%1$s - saturated connection pool", this);
|
||||
}
|
||||
logger
|
||||
.debug(
|
||||
"%1$s - attempting to acquire connection; %d currently available",
|
||||
this, available.size());
|
||||
C conn = available.poll(5, TimeUnit.SECONDS);
|
||||
if (conn == null)
|
||||
throw new TimeoutException(
|
||||
"could not obtain a pooled connection within 5 seconds");
|
||||
public static interface Factory<E, C, O extends FutureCommand<E, ?, ?, ?>> {
|
||||
FutureCommandConnectionPool<E, C, O> create(E endPoint);
|
||||
}
|
||||
|
||||
logger.trace("%1$s - %2$d - aquired", conn, conn.hashCode());
|
||||
if (connectionValid(conn)) {
|
||||
logger.debug("%1$s - %2$d - reusing", conn, conn.hashCode());
|
||||
return conn;
|
||||
} else {
|
||||
logger.debug("%1$s - %2$d - unusable", conn, conn.hashCode());
|
||||
shutdownConnection(conn);
|
||||
allConnections.release();
|
||||
return getConnection();
|
||||
}
|
||||
}
|
||||
public FutureCommandConnectionPool(ExecutorService executor, Semaphore allConnections,
|
||||
BlockingQueue<O> commandQueue, @Named("maxConnectionReuse") int maxConnectionReuse,
|
||||
BlockingQueue<C> available, @Assisted E endPoint, BaseLifeCycle... dependencies) {
|
||||
super(executor, dependencies);
|
||||
this.allConnections = allConnections;
|
||||
this.resubmitQueue = commandQueue;
|
||||
this.maxConnectionReuse = maxConnectionReuse;
|
||||
this.available = available;
|
||||
this.endPoint = endPoint;
|
||||
}
|
||||
|
||||
protected void fatalException(Exception ex, C conn) {
|
||||
setResponseException(ex, conn);
|
||||
exception.set(ex);
|
||||
shutdown();
|
||||
}
|
||||
protected void setResponseException(Exception ex, C conn) {
|
||||
O command = getHandleFromConnection(conn).getCommand();
|
||||
command.getResponseFuture().setException(ex);
|
||||
}
|
||||
|
||||
protected abstract void shutdownConnection(C conn);
|
||||
protected void cancel(C conn) {
|
||||
O command = getHandleFromConnection(conn).getCommand();
|
||||
command.cancel(true);
|
||||
}
|
||||
|
||||
protected abstract boolean connectionValid(C conn);
|
||||
protected C getConnection() throws InterruptedException, TimeoutException {
|
||||
exceptionIfNotActive();
|
||||
if (!hitBottom) {
|
||||
hitBottom = available.size() == 0 && allConnections.availablePermits() == 0;
|
||||
if (hitBottom)
|
||||
logger.warn("%1$s - saturated connection pool", this);
|
||||
}
|
||||
logger.debug("%s - attempting to acquire connection; %s currently available", this, available
|
||||
.size());
|
||||
C conn = available.poll(5, TimeUnit.SECONDS);
|
||||
if (conn == null)
|
||||
throw new TimeoutException("could not obtain a pooled connection within 5 seconds");
|
||||
|
||||
public FutureCommandConnectionHandle<C, O> getHandle(O command)
|
||||
throws InterruptedException, TimeoutException {
|
||||
exceptionIfNotActive();
|
||||
C conn = getConnection();
|
||||
FutureCommandConnectionHandle<C, O> handle = futureCommandConnectionHandleFactory
|
||||
.create(command, conn);
|
||||
associateHandleWithConnection(handle, conn);
|
||||
return handle;
|
||||
}
|
||||
logger.trace("%1$s - %2$d - aquired", conn, conn.hashCode());
|
||||
if (connectionValid(conn)) {
|
||||
logger.debug("%1$s - %2$d - reusing", conn, conn.hashCode());
|
||||
return conn;
|
||||
} else {
|
||||
logger.debug("%1$s - %2$d - unusable", conn, conn.hashCode());
|
||||
shutdownConnection(conn);
|
||||
allConnections.release();
|
||||
return getConnection();
|
||||
}
|
||||
}
|
||||
|
||||
protected void resubmitIfRequestIsReplayable(C connection, Exception e) {
|
||||
O command = getCommandFromConnection(connection);
|
||||
if (command != null) {
|
||||
if (isReplayable(command)) {
|
||||
logger.info("resubmitting command: %1$s", command);
|
||||
commandQueue.add(command);
|
||||
} else {
|
||||
command.setException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
protected void fatalException(Exception ex, C conn) {
|
||||
setResponseException(ex, conn);
|
||||
exception.set(ex);
|
||||
shutdown();
|
||||
}
|
||||
|
||||
protected abstract boolean isReplayable(O command);
|
||||
protected abstract void shutdownConnection(C conn);
|
||||
|
||||
O getCommandFromConnection(C connection) {
|
||||
FutureCommandConnectionHandle<C, O> handle = getHandleFromConnection(connection);
|
||||
if (handle != null && handle.getCommand() != null) {
|
||||
return handle.getCommand();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
protected abstract boolean connectionValid(C conn);
|
||||
|
||||
protected void setExceptionOnCommand(C connection, Exception e) {
|
||||
FutureCommand<?, ?, ?> command = getCommandFromConnection(connection);
|
||||
if (command != null) {
|
||||
logger.warn(e, "exception in command: %1$s", command);
|
||||
command.setException(e);
|
||||
}
|
||||
}
|
||||
public FutureCommandConnectionHandle<E, C, O> getHandle(O command) throws InterruptedException,
|
||||
TimeoutException {
|
||||
exceptionIfNotActive();
|
||||
C conn = getConnection();
|
||||
FutureCommandConnectionHandle<E, C, O> handle = createHandle(command, conn);
|
||||
associateHandleWithConnection(handle, conn);
|
||||
return handle;
|
||||
}
|
||||
|
||||
protected abstract void associateHandleWithConnection(
|
||||
FutureCommandConnectionHandle<C, O> handle, C connection);
|
||||
protected abstract FutureCommandConnectionHandle<E, C, O> createHandle(O command, C conn);
|
||||
|
||||
protected abstract FutureCommandConnectionHandle<C, O> getHandleFromConnection(
|
||||
C connection);
|
||||
protected void resubmitIfRequestIsReplayable(C connection, Exception e) {
|
||||
O command = getCommandFromConnection(connection);
|
||||
if (command != null) {
|
||||
if (isReplayable(command)) {
|
||||
logger.info("resubmitting command: %1$s", command);
|
||||
resubmitQueue.add(command);
|
||||
} else {
|
||||
command.setException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void createNewConnection() throws InterruptedException;
|
||||
protected abstract boolean isReplayable(O command);
|
||||
|
||||
O getCommandFromConnection(C connection) {
|
||||
FutureCommandConnectionHandle<E, C, O> handle = getHandleFromConnection(connection);
|
||||
if (handle != null && handle.getCommand() != null) {
|
||||
return handle.getCommand();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
protected void setExceptionOnCommand(C connection, Exception e) {
|
||||
FutureCommand<E, ?, ?, ?> command = getCommandFromConnection(connection);
|
||||
if (command != null) {
|
||||
logger.warn(e, "exception in command: %1$s", command);
|
||||
command.setException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void associateHandleWithConnection(
|
||||
FutureCommandConnectionHandle<E, C, O> handle, C connection);
|
||||
|
||||
protected abstract FutureCommandConnectionHandle<E, C, O> getHandleFromConnection(C connection);
|
||||
|
||||
protected abstract void createNewConnection() throws InterruptedException;
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((endPoint == null) ? 0 : endPoint.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
FutureCommandConnectionPool<?, ?, ?> other = (FutureCommandConnectionPool<?, ?, ?>) obj;
|
||||
if (endPoint == null) {
|
||||
if (other.endPoint != null)
|
||||
return false;
|
||||
} else if (!endPoint.equals(other.endPoint))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
public interface FutureCommandConnectionHandleFactory<C, O extends FutureCommand<?, ?, ?>> {
|
||||
FutureCommandConnectionHandle<C, O> create(O command, C conn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
package org.jclouds.command.pool;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
@ -33,6 +34,8 @@ import org.jclouds.command.FutureCommandClient;
|
|||
import org.jclouds.lifecycle.BaseLifeCycle;
|
||||
import org.jclouds.util.Utils;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.MapMaker;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
|
@ -40,30 +43,30 @@ import com.google.inject.Inject;
|
|||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class FutureCommandConnectionPoolClient<C, O extends FutureCommand<?, ?, ?>> extends
|
||||
public class FutureCommandConnectionPoolClient<E, C, O extends FutureCommand<E, ?, ?, ?>> extends
|
||||
BaseLifeCycle implements FutureCommandClient<O> {
|
||||
private final FutureCommandConnectionPool<C, O> futureCommandConnectionPool;
|
||||
|
||||
private final ConcurrentMap<E, FutureCommandConnectionPool<E, C, O>> poolMap;
|
||||
private final BlockingQueue<O> commandQueue;
|
||||
private final FutureCommandConnectionPool.Factory<E, C, O> poolFactory;
|
||||
|
||||
@Inject
|
||||
public FutureCommandConnectionPoolClient(ExecutorService executor,
|
||||
FutureCommandConnectionPool<C, O> futureCommandConnectionPool,
|
||||
BlockingQueue<O> commandQueue) {
|
||||
super(executor, futureCommandConnectionPool);
|
||||
this.futureCommandConnectionPool = futureCommandConnectionPool;
|
||||
FutureCommandConnectionPool.Factory<E, C, O> pf, BlockingQueue<O> commandQueue) {
|
||||
super(executor);
|
||||
this.poolFactory = pf;
|
||||
// TODO inject this.
|
||||
poolMap = new MapMaker()
|
||||
.makeComputingMap(new Function<E, FutureCommandConnectionPool<E, C, O>>() {
|
||||
public FutureCommandConnectionPool<E, C, O> apply(E endPoint) {
|
||||
FutureCommandConnectionPool<E, C, O> pool = poolFactory.create(endPoint);
|
||||
addDependency(pool);
|
||||
return pool;
|
||||
}
|
||||
});
|
||||
this.commandQueue = commandQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
* <p/>
|
||||
* we continue while the connection pool is active
|
||||
*/
|
||||
@Override
|
||||
protected boolean shouldDoWork() {
|
||||
return super.shouldDoWork() && futureCommandConnectionPool.getStatus().equals(Status.ACTIVE);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
|
@ -72,9 +75,9 @@ public class FutureCommandConnectionPoolClient<C, O extends FutureCommand<?, ?,
|
|||
*/
|
||||
@Override
|
||||
protected void doShutdown() {
|
||||
exception.compareAndSet(null, futureCommandConnectionPool.getException());
|
||||
exception.compareAndSet(null, getExceptionFromDependenciesOrNull());
|
||||
while (!commandQueue.isEmpty()) {
|
||||
FutureCommand<?, ?, ?> command = (FutureCommand<?, ?, ?>) commandQueue.remove();
|
||||
FutureCommand<E, ?, ?, ?> command = (FutureCommand<E, ?, ?, ?>) commandQueue.remove();
|
||||
if (command != null) {
|
||||
if (exception.get() != null)
|
||||
command.setException(exception.get());
|
||||
|
@ -119,21 +122,29 @@ public class FutureCommandConnectionPoolClient<C, O extends FutureCommand<?, ?,
|
|||
*/
|
||||
protected void invoke(O command) {
|
||||
exceptionIfNotActive();
|
||||
FutureCommandConnectionHandle<C, O> connectionHandle = null;
|
||||
FutureCommandConnectionPool<E, C, O> pool = poolMap.get(command.getRequest().getEndPoint());
|
||||
if (pool == null) {
|
||||
//TODO limit;
|
||||
logger.warn("pool not available for command %s; retrying", command);
|
||||
commandQueue.add(command);
|
||||
return;
|
||||
}
|
||||
|
||||
FutureCommandConnectionHandle<E, C, O> connectionHandle = null;
|
||||
try {
|
||||
connectionHandle = futureCommandConnectionPool.getHandle(command);
|
||||
connectionHandle = pool.getHandle(command);
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn(e, "Interrupted getting a connection for command %1$s; retrying", command);
|
||||
logger.warn(e, "Interrupted getting a connection for command %s; retrying", command);
|
||||
commandQueue.add(command);
|
||||
return;
|
||||
} catch (TimeoutException e) {
|
||||
logger.warn(e, "Timeout getting a connection for command %1$s; retrying", command);
|
||||
logger.warn(e, "Timeout getting a connection for command %s on pool %s; retrying", command, pool);
|
||||
commandQueue.add(command);
|
||||
return;
|
||||
}
|
||||
|
||||
if (connectionHandle == null) {
|
||||
logger.error("Failed to obtain connection for command %1$s; retrying", command);
|
||||
logger.error("Failed to obtain connection for command %s; retrying", command);
|
||||
commandQueue.add(command);
|
||||
return;
|
||||
}
|
||||
|
@ -146,7 +157,7 @@ public class FutureCommandConnectionPoolClient<C, O extends FutureCommand<?, ?,
|
|||
sb.append("FutureCommandConnectionPoolClient");
|
||||
sb.append("{status=").append(status);
|
||||
sb.append(", commandQueue=").append((commandQueue != null) ? commandQueue.size() : 0);
|
||||
sb.append(", futureCommandConnectionPool=").append(futureCommandConnectionPool);
|
||||
sb.append(", poolMap=").append(poolMap);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.jclouds.lifecycle.config.LifeCycleModule;
|
|||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.Singleton;
|
||||
import com.google.inject.name.Named;
|
||||
|
||||
/**
|
||||
|
@ -47,7 +46,7 @@ public abstract class FutureCommandConnectionPoolClientModule<C> extends
|
|||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
// @Singleton per uri...
|
||||
public abstract BlockingQueue<C> provideAvailablePool(
|
||||
@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max)
|
||||
throws Exception;
|
||||
|
@ -62,7 +61,7 @@ public abstract class FutureCommandConnectionPoolClientModule<C> extends
|
|||
* @throws Exception
|
||||
*/
|
||||
@Provides
|
||||
@Singleton
|
||||
// @Singleton per uri...
|
||||
public Semaphore provideTotalConnectionSemaphore(
|
||||
@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max)
|
||||
throws Exception {
|
||||
|
|
|
@ -30,9 +30,9 @@ package org.jclouds.http;
|
|||
*/
|
||||
public interface HttpErrorHandler {
|
||||
public static final HttpErrorHandler NOOP = new HttpErrorHandler() {
|
||||
public void handle(HttpFutureCommand<?> command, HttpResponse response) {
|
||||
public void handleError(HttpFutureCommand<?> command, HttpResponse response) {
|
||||
}
|
||||
};
|
||||
|
||||
void handle(HttpFutureCommand<?> command, HttpResponse response);
|
||||
void handleError(HttpFutureCommand<?> command, HttpResponse response);
|
||||
}
|
||||
|
|
|
@ -39,7 +39,16 @@ import org.jclouds.logging.Logger;
|
|||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class HttpFutureCommand<T> extends FutureCommand<HttpRequest, HttpResponse, T> {
|
||||
public class HttpFutureCommand<T> extends FutureCommand<URI, HttpRequest, HttpResponse, T> {
|
||||
private volatile int redirectCount;
|
||||
|
||||
public int incrementRedirectCount() {
|
||||
return ++redirectCount;
|
||||
}
|
||||
|
||||
public int getRedirectCount() {
|
||||
return redirectCount;
|
||||
}
|
||||
|
||||
public HttpFutureCommand(URI endPoint, HttpMethod method, String uri,
|
||||
ResponseCallable<T> responseCallable) {
|
||||
|
|
|
@ -25,65 +25,61 @@ package org.jclouds.http;
|
|||
|
||||
public interface HttpHeaders {
|
||||
|
||||
/**
|
||||
* Can be used to specify caching behavior along the request/reply chain. Go
|
||||
* to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html?sec14.9.
|
||||
*/
|
||||
public static final String CACHE_CONTROL = "Cache-Control";
|
||||
/**
|
||||
* Specifies presentational information for the object. Go to
|
||||
* http://www.w3.org/Protocols/rfc2616/rfc2616-sec19.html?sec19.5.1.
|
||||
*/
|
||||
public static final String CONTENT_DISPOSITION = "Content-Disposition";
|
||||
/**
|
||||
* Specifies what content encodings have been applied to the object and thus
|
||||
* what decoding mechanisms must be applied in order to obtain the
|
||||
* media-type referenced by the Content-Type header field. Go to
|
||||
* http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html?sec14.11.
|
||||
*/
|
||||
public static final String CONTENT_ENCODING = "Content-Encoding";
|
||||
/**
|
||||
* The size of the object, in bytes. This is required. Go to
|
||||
* http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html?sec14.13.
|
||||
*/
|
||||
public static final String CONTENT_LENGTH = "Content-Length";
|
||||
/**
|
||||
* A standard MIME type describing the format of the contents. If none is
|
||||
* provided, the default is binary/octet-stream. Go to
|
||||
* http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html?sec14.17.
|
||||
*/
|
||||
public static final String CONTENT_TYPE = "Content-Type";
|
||||
/**
|
||||
* The base64 encoded 128-bit MD5 digest of the message (without the
|
||||
* headers) according to RFC 1864. This header can be used as a message
|
||||
* integrity check to verify that the data is the same data that was
|
||||
* originally sent.
|
||||
*/
|
||||
public static final String CONTENT_MD5 = "Content-MD5";
|
||||
/**
|
||||
* A user agent that wishes to authenticate itself with a server-- usually,
|
||||
* but not necessarily, after receiving a 401 response--does so by including
|
||||
* an Authorization request-header field with the request. The Authorization
|
||||
* field value consists of credentials containing the authentication
|
||||
* information of the user agent for the realm of the resource being
|
||||
* requested.
|
||||
*
|
||||
* Authorization = "Authorization" ":" credentials
|
||||
*
|
||||
* @see <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html" />
|
||||
*/
|
||||
public static final String AUTHORIZATION = "Authorization";
|
||||
public static final String HOST = "Host";
|
||||
public static final String DATE = "Date";
|
||||
public static final String TRANSFER_ENCODING = "Transfer-Encoding";
|
||||
public static final String LAST_MODIFIED = "Last-Modified";
|
||||
public static final String SERVER = "Server";
|
||||
public static final String ETAG = "ETag";
|
||||
public static final String RANGE = "Range";
|
||||
public static final String IF_MODIFIED_SINCE = "If-Modified-Since";
|
||||
public static final String IF_UNMODIFIED_SINCE = "If-Unmodified-Since";
|
||||
public static final String IF_MATCH = "If-Match";
|
||||
public static final String IF_NONE_MATCH = "If-None-Match";
|
||||
public static final String CONTENT_RANGE = "Content-Range";
|
||||
/**
|
||||
* Can be used to specify caching behavior along the request/reply chain. Go to
|
||||
* http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html?sec14.9.
|
||||
*/
|
||||
public static final String CACHE_CONTROL = "Cache-Control";
|
||||
/**
|
||||
* Specifies presentational information for the object. Go to
|
||||
* http://www.w3.org/Protocols/rfc2616/rfc2616-sec19.html?sec19.5.1.
|
||||
*/
|
||||
public static final String CONTENT_DISPOSITION = "Content-Disposition";
|
||||
/**
|
||||
* Specifies what content encodings have been applied to the object and thus what decoding
|
||||
* mechanisms must be applied in order to obtain the media-type referenced by the Content-Type
|
||||
* header field. Go to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html?sec14.11.
|
||||
*/
|
||||
public static final String CONTENT_ENCODING = "Content-Encoding";
|
||||
/**
|
||||
* The size of the object, in bytes. This is required. Go to
|
||||
* http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html?sec14.13.
|
||||
*/
|
||||
public static final String CONTENT_LENGTH = "Content-Length";
|
||||
/**
|
||||
* A standard MIME type describing the format of the contents. If none is provided, the default
|
||||
* is binary/octet-stream. Go to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html?sec14.17.
|
||||
*/
|
||||
public static final String CONTENT_TYPE = "Content-Type";
|
||||
/**
|
||||
* The base64 encoded 128-bit MD5 digest of the message (without the headers) according to RFC
|
||||
* 1864. This header can be used as a message integrity check to verify that the data is the same
|
||||
* data that was originally sent.
|
||||
*/
|
||||
public static final String CONTENT_MD5 = "Content-MD5";
|
||||
/**
|
||||
* A user agent that wishes to authenticate itself with a server-- usually, but not necessarily,
|
||||
* after receiving a 401 response--does so by including an Authorization request-header field
|
||||
* with the request. The Authorization field value consists of credentials containing the
|
||||
* authentication information of the user agent for the realm of the resource being requested.
|
||||
*
|
||||
* Authorization = "Authorization" ":" credentials
|
||||
*
|
||||
* @see <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html" />
|
||||
*/
|
||||
public static final String AUTHORIZATION = "Authorization";
|
||||
public static final String HOST = "Host";
|
||||
public static final String DATE = "Date";
|
||||
public static final String TRANSFER_ENCODING = "Transfer-Encoding";
|
||||
public static final String LAST_MODIFIED = "Last-Modified";
|
||||
public static final String SERVER = "Server";
|
||||
public static final String ETAG = "ETag";
|
||||
public static final String RANGE = "Range";
|
||||
public static final String IF_MODIFIED_SINCE = "If-Modified-Since";
|
||||
public static final String IF_UNMODIFIED_SINCE = "If-Unmodified-Since";
|
||||
public static final String IF_MATCH = "If-Match";
|
||||
public static final String IF_NONE_MATCH = "If-None-Match";
|
||||
public static final String CONTENT_RANGE = "Content-Range";
|
||||
public static final String LOCATION = "Location";
|
||||
|
||||
}
|
|
@ -30,6 +30,7 @@ import java.net.URI;
|
|||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import org.jclouds.command.Request;
|
||||
import org.jclouds.logging.Logger;
|
||||
import org.jclouds.util.Utils;
|
||||
|
||||
|
@ -38,7 +39,7 @@ import org.jclouds.util.Utils;
|
|||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class HttpRequest extends HttpMessage {
|
||||
public class HttpRequest extends HttpMessage implements Request<URI> {
|
||||
|
||||
private URI endPoint;
|
||||
private final HttpMethod method;
|
||||
|
@ -100,10 +101,16 @@ public class HttpRequest extends HttpMessage {
|
|||
this.payload = content;
|
||||
}
|
||||
|
||||
/**
|
||||
* only the scheme, host, and port of the URI designates the endpoint
|
||||
*/
|
||||
public void setEndPoint(URI endPoint) {
|
||||
this.endPoint = endPoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* only the scheme, host, and port of the URI designates the endpoint
|
||||
*/
|
||||
public URI getEndPoint() {
|
||||
return endPoint;
|
||||
}
|
||||
|
|
|
@ -24,25 +24,21 @@
|
|||
package org.jclouds.http;
|
||||
|
||||
/**
|
||||
* Indicate whether a request should be retried after a server
|
||||
* error response (HTTP status code >= 500) based on the request's
|
||||
* replayable status and the number of attempts already performed.
|
||||
* Indicate whether a request should be retried after a server error response (HTTP status code >=
|
||||
* 500) based on the request's replayable status and the number of attempts already performed.
|
||||
*
|
||||
* @author James Murty
|
||||
*/
|
||||
public interface HttpRetryHandler {
|
||||
public static final HttpRetryHandler ALWAYS_RETRY = new HttpRetryHandler() {
|
||||
public boolean shouldRetryRequest(HttpFutureCommand<?> command, HttpResponse response)
|
||||
{
|
||||
public boolean shouldRetryRequest(HttpFutureCommand<?> command, HttpResponse response) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Return true if the command should be retried. This method should only be
|
||||
* invoked when the response has failed with a HTTP 5xx error indicating a
|
||||
* server-side error.
|
||||
* Return true if the command should be retried. This method should only be invoked when the
|
||||
* response has failed with a HTTP 5xx error indicating a server-side error.
|
||||
*/
|
||||
boolean shouldRetryRequest(HttpFutureCommand<?> command, HttpResponse response)
|
||||
throws InterruptedException;
|
||||
boolean shouldRetryRequest(HttpFutureCommand<?> command, HttpResponse response);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2009 Adrian Cole <adrian@jclouds.org>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
package org.jclouds.http.annotation;
|
||||
|
||||
import com.google.inject.BindingAnnotation;
|
||||
import java.lang.annotation.Target;
|
||||
import java.lang.annotation.Retention;
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
import static java.lang.annotation.ElementType.PARAMETER;
|
||||
import static java.lang.annotation.ElementType.FIELD;
|
||||
import static java.lang.annotation.ElementType.METHOD;
|
||||
|
||||
/**
|
||||
* Implies that the object can address {@link org.jclouds.http.HttpResponse}s
|
||||
* that contain status code 4xx.
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
@BindingAnnotation
|
||||
@Target( { FIELD, PARAMETER, METHOD })
|
||||
@Retention(RUNTIME)
|
||||
public @interface ClientError {
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2009 Adrian Cole <adrian@jclouds.org>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
package org.jclouds.http.annotation;
|
||||
|
||||
import com.google.inject.BindingAnnotation;
|
||||
import java.lang.annotation.Target;
|
||||
import java.lang.annotation.Retention;
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
import static java.lang.annotation.ElementType.PARAMETER;
|
||||
import static java.lang.annotation.ElementType.FIELD;
|
||||
import static java.lang.annotation.ElementType.METHOD;
|
||||
|
||||
/**
|
||||
* Implies that the object can address {@link org.jclouds.http.HttpResponse}s that contain status
|
||||
* code 3xx.
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
@BindingAnnotation
|
||||
@Target( { FIELD, PARAMETER, METHOD })
|
||||
@Retention(RUNTIME)
|
||||
public @interface Redirection {
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2009 Adrian Cole <adrian@jclouds.org>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
package org.jclouds.http.annotation;
|
||||
|
||||
import com.google.inject.BindingAnnotation;
|
||||
import java.lang.annotation.Target;
|
||||
import java.lang.annotation.Retention;
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
import static java.lang.annotation.ElementType.PARAMETER;
|
||||
import static java.lang.annotation.ElementType.FIELD;
|
||||
import static java.lang.annotation.ElementType.METHOD;
|
||||
|
||||
/**
|
||||
* Implies that the object can address {@link org.jclouds.http.HttpResponse}s that contain status
|
||||
* code 5xx.
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
@BindingAnnotation
|
||||
@Target( { FIELD, PARAMETER, METHOD })
|
||||
@Retention(RUNTIME)
|
||||
public @interface ServerError {
|
||||
}
|
|
@ -23,17 +23,10 @@
|
|||
*/
|
||||
package org.jclouds.http.config;
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.jclouds.http.HttpConstants;
|
||||
import org.jclouds.http.HttpFutureCommandClient;
|
||||
import org.jclouds.http.internal.JavaUrlHttpFutureCommandClient;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.Singleton;
|
||||
import com.google.inject.name.Named;
|
||||
|
||||
/**
|
||||
* Configures {@link JavaUrlHttpFutureCommandClient}.
|
||||
|
@ -53,15 +46,4 @@ public class JavaUrlHttpFutureCommandClientModule extends AbstractModule {
|
|||
bind(HttpFutureCommandClient.class).to(JavaUrlHttpFutureCommandClient.class);
|
||||
}
|
||||
|
||||
@Singleton
|
||||
@Provides
|
||||
protected URI provideAddress(@Named(HttpConstants.PROPERTY_HTTP_ADDRESS) String address,
|
||||
@Named(HttpConstants.PROPERTY_HTTP_PORT) int port,
|
||||
@Named(HttpConstants.PROPERTY_HTTP_SECURE) boolean isSecure)
|
||||
throws MalformedURLException {
|
||||
|
||||
return URI.create(String.format("%1$s://%2$s:%3$s", isSecure ? "https" : "http", address,
|
||||
port));
|
||||
}
|
||||
|
||||
}
|
|
@ -35,24 +35,42 @@ import com.google.inject.Inject;
|
|||
import com.google.inject.name.Named;
|
||||
|
||||
/**
|
||||
* Allow replayable request to be retried a limited number of times, and
|
||||
* impose an exponential back-off delay before returning.
|
||||
* Allow replayable request to be retried a limited number of times, and impose an exponential
|
||||
* back-off delay before returning.
|
||||
* <p>
|
||||
* The back-off delay grows rapidly according to the formula
|
||||
* <code>50 * (<i>{@link HttpFutureCommand#getFailureCount()}</i> ^ 2)</code>. For example:
|
||||
* <table>
|
||||
* <tr><th>Number of Attempts</th><th>Delay in milliseconds</th></tr>
|
||||
* <tr><td>1</td><td>50</td></tr>
|
||||
* <tr><td>2</td><td>200</td></tr>
|
||||
* <tr><td>3</td><td>450</td></tr>
|
||||
* <tr><td>4</td><td>800</td></tr>
|
||||
* <tr><td>5</td><td>1250</td></tr>
|
||||
* <tr>
|
||||
* <th>Number of Attempts</th>
|
||||
* <th>Delay in milliseconds</th>
|
||||
* </tr>
|
||||
* <tr>
|
||||
* <td>1</td>
|
||||
* <td>50</td>
|
||||
* </tr>
|
||||
* <tr>
|
||||
* <td>2</td>
|
||||
* <td>200</td>
|
||||
* </tr>
|
||||
* <tr>
|
||||
* <td>3</td>
|
||||
* <td>450</td>
|
||||
* </tr>
|
||||
* <tr>
|
||||
* <td>4</td>
|
||||
* <td>800</td>
|
||||
* </tr>
|
||||
* <tr>
|
||||
* <td>5</td>
|
||||
* <td>1250</td>
|
||||
* </tr>
|
||||
* </table>
|
||||
* <p>
|
||||
* This implementation has two side-effects. It increments the command's failure count
|
||||
* with {@link HttpFutureCommand#incrementFailureCount()}, because this failure count
|
||||
* value is used to determine how many times the command has already been tried. It
|
||||
* also closes the response's content input stream to ensure connections are cleaned up.
|
||||
* This implementation has two side-effects. It increments the command's failure count with
|
||||
* {@link HttpFutureCommand#incrementFailureCount()}, because this failure count value is used to
|
||||
* determine how many times the command has already been tried. It also closes the response's
|
||||
* content input stream to ensure connections are cleaned up.
|
||||
*
|
||||
* @author James Murty
|
||||
*/
|
||||
|
@ -67,9 +85,7 @@ public class BackoffLimitedRetryHandler implements HttpRetryHandler {
|
|||
this.retryCountLimit = retryCountLimit;
|
||||
}
|
||||
|
||||
public boolean shouldRetryRequest(HttpFutureCommand<?> command, HttpResponse response)
|
||||
throws InterruptedException
|
||||
{
|
||||
public boolean shouldRetryRequest(HttpFutureCommand<?> command, HttpResponse response) {
|
||||
IOUtils.closeQuietly(response.getContent());
|
||||
|
||||
command.incrementFailureCount();
|
||||
|
@ -78,22 +94,25 @@ public class BackoffLimitedRetryHandler implements HttpRetryHandler {
|
|||
logger.warn("Cannot retry after server error, command is not replayable: %1$s", command);
|
||||
return false;
|
||||
} else if (command.getFailureCount() > retryCountLimit) {
|
||||
logger.warn("Cannot retry after server error, command has exceeded retry limit %1$d: %2$s",
|
||||
retryCountLimit, command);
|
||||
logger.warn(
|
||||
"Cannot retry after server error, command has exceeded retry limit %1$d: %2$s",
|
||||
retryCountLimit, command);
|
||||
return false;
|
||||
} else {
|
||||
imposeBackoffExponentialDelay(command.getFailureCount(), command.toString());
|
||||
return true;
|
||||
imposeBackoffExponentialDelay(command.getFailureCount(), command.toString());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public void imposeBackoffExponentialDelay(int failureCound, String commandDescription)
|
||||
throws InterruptedException
|
||||
{
|
||||
public void imposeBackoffExponentialDelay(int failureCound, String commandDescription) {
|
||||
long delayMs = (long) (50L * Math.pow(failureCound, 2));
|
||||
logger.debug("Retry %1$d/%2$d after server error, delaying for %3$d ms: %4$s",
|
||||
failureCound, retryCountLimit, delayMs, commandDescription);
|
||||
Thread.sleep(delayMs);
|
||||
logger.debug("Retry %1$d/%2$d after server error, delaying for %3$d ms: %4$s", failureCound,
|
||||
retryCountLimit, delayMs, commandDescription);
|
||||
try {
|
||||
Thread.sleep(delayMs);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error(e, "Interrupted imposing delay");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2009 Global Cloud Specialists, Inc. <info@globalcloudspecialists.com>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
package org.jclouds.http.handlers;
|
||||
|
||||
import org.jclouds.http.HttpFutureCommand;
|
||||
import org.jclouds.http.HttpResponse;
|
||||
import org.jclouds.http.HttpRetryHandler;
|
||||
|
||||
/**
|
||||
* Always returns false.
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class CannotRetryHandler implements HttpRetryHandler {
|
||||
|
||||
public boolean shouldRetryRequest(HttpFutureCommand<?> command, HttpResponse response) {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -25,19 +25,19 @@ package org.jclouds.http.handlers;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.jclouds.http.HttpErrorHandler;
|
||||
import org.jclouds.http.HttpFutureCommand;
|
||||
import org.jclouds.http.HttpResponse;
|
||||
import org.jclouds.http.HttpResponseException;
|
||||
import org.jclouds.http.HttpErrorHandler;
|
||||
import org.jclouds.util.Utils;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class CloseContentAndSetExceptionHandler implements HttpErrorHandler {
|
||||
public class CloseContentAndSetExceptionErrorHandler implements HttpErrorHandler {
|
||||
|
||||
public void handle(HttpFutureCommand<?> command, HttpResponse response) {
|
||||
public void handleError(HttpFutureCommand<?> command, HttpResponse response) {
|
||||
String content;
|
||||
try {
|
||||
content = response.getContent() != null ? Utils.toStringAndClose(response.getContent())
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2009 Global Cloud Specialists, Inc. <info@globalcloudspecialists.com>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
package org.jclouds.http.handlers;
|
||||
|
||||
import org.jclouds.http.HttpErrorHandler;
|
||||
import org.jclouds.http.HttpFutureCommand;
|
||||
import org.jclouds.http.annotation.ClientError;
|
||||
import org.jclouds.http.annotation.Redirection;
|
||||
import org.jclouds.http.annotation.ServerError;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
* Delegates to {@link HttpErrorHandler HttpErrorHandlers} who are annotated according to the
|
||||
* response codes they relate to.
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class DelegatingErrorHandler implements HttpErrorHandler {
|
||||
|
||||
@Redirection
|
||||
@Inject(optional = true)
|
||||
private HttpErrorHandler redirectionHandler = new CloseContentAndSetExceptionErrorHandler();
|
||||
|
||||
@ClientError
|
||||
@Inject(optional = true)
|
||||
private HttpErrorHandler clientErrorHandler = new CloseContentAndSetExceptionErrorHandler();
|
||||
|
||||
@ServerError
|
||||
@Inject(optional = true)
|
||||
private HttpErrorHandler serverErrorHandler = new CloseContentAndSetExceptionErrorHandler();
|
||||
|
||||
public void handleError(HttpFutureCommand<?> command, org.jclouds.http.HttpResponse response) {
|
||||
int statusCode = response.getStatusCode();
|
||||
if (statusCode >= 300 && statusCode < 400) {
|
||||
redirectionHandler.handleError(command, response);
|
||||
} else if (statusCode >= 400 && statusCode < 500) {
|
||||
clientErrorHandler.handleError(command, response);
|
||||
} else if (statusCode >= 500) {
|
||||
serverErrorHandler.handleError(command, response);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2009 Global Cloud Specialists, Inc. <info@globalcloudspecialists.com>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
package org.jclouds.http.handlers;
|
||||
|
||||
import org.jclouds.http.HttpFutureCommand;
|
||||
import org.jclouds.http.HttpRetryHandler;
|
||||
import org.jclouds.http.annotation.ClientError;
|
||||
import org.jclouds.http.annotation.Redirection;
|
||||
import org.jclouds.http.annotation.ServerError;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
||||
/**
|
||||
* Delegates to {@link HttpRetryHandler HttpRetryHandlers} who are annotated according to the
|
||||
* response codes they relate to.
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class DelegatingRetryHandler implements HttpRetryHandler {
|
||||
|
||||
@Redirection
|
||||
@Inject(optional = true)
|
||||
private HttpRetryHandler redirectionRetryHandler = new RedirectionRetryHandler(5);
|
||||
|
||||
@ClientError
|
||||
@Inject(optional = true)
|
||||
private HttpRetryHandler clientErrorRetryHandler = new CannotRetryHandler();
|
||||
|
||||
@ServerError
|
||||
@Inject(optional = true)
|
||||
private HttpRetryHandler serverErrorRetryHandler = new BackoffLimitedRetryHandler(5);
|
||||
|
||||
public boolean shouldRetryRequest(HttpFutureCommand<?> command,
|
||||
org.jclouds.http.HttpResponse response) {
|
||||
int statusCode = response.getStatusCode();
|
||||
boolean retryRequest = false;
|
||||
if (statusCode >= 300 && statusCode < 400) {
|
||||
retryRequest = redirectionRetryHandler.shouldRetryRequest(command, response);
|
||||
} else if (statusCode >= 400 && statusCode < 500) {
|
||||
retryRequest = clientErrorRetryHandler.shouldRetryRequest(command, response);
|
||||
} else if (statusCode >= 500) {
|
||||
retryRequest = serverErrorRetryHandler.shouldRetryRequest(command, response);
|
||||
}
|
||||
return retryRequest;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
*
|
||||
* Copyright (C) 2009 Global Cloud Specialists, Inc. <info@globalcloudspecialists.com>
|
||||
*
|
||||
* ====================================================================
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
* ====================================================================
|
||||
*/
|
||||
package org.jclouds.http.handlers;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.jclouds.http.HttpFutureCommand;
|
||||
import org.jclouds.http.HttpHeaders;
|
||||
import org.jclouds.http.HttpResponse;
|
||||
import org.jclouds.http.HttpRetryHandler;
|
||||
import org.jclouds.logging.Logger;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.name.Named;
|
||||
|
||||
/**
|
||||
* Handles Retryable responses with error codes in the 3xx range
|
||||
*
|
||||
* @author Adrian Cole
|
||||
*/
|
||||
public class RedirectionRetryHandler implements HttpRetryHandler {
|
||||
private final int retryCountLimit;
|
||||
|
||||
@Resource
|
||||
protected Logger logger = Logger.NULL;
|
||||
|
||||
@Inject
|
||||
public RedirectionRetryHandler(@Named("jclouds.http.max-redirects") int retryCountLimit) {
|
||||
this.retryCountLimit = retryCountLimit;
|
||||
}
|
||||
|
||||
public boolean shouldRetryRequest(HttpFutureCommand<?> command, HttpResponse response) {
|
||||
IOUtils.closeQuietly(response.getContent());
|
||||
command.incrementRedirectCount();
|
||||
|
||||
String hostHeader = response.getFirstHeaderOrNull(HttpHeaders.LOCATION);
|
||||
if (hostHeader != null && command.getRedirectCount() < retryCountLimit) {
|
||||
URI redirectURI = URI.create(hostHeader);
|
||||
command.getRequest().setEndPoint(redirectURI);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -36,8 +36,8 @@ import org.jclouds.http.HttpRequest;
|
|||
import org.jclouds.http.HttpRequestFilter;
|
||||
import org.jclouds.http.HttpResponse;
|
||||
import org.jclouds.http.HttpRetryHandler;
|
||||
import org.jclouds.http.handlers.BackoffLimitedRetryHandler;
|
||||
import org.jclouds.http.handlers.CloseContentAndSetExceptionHandler;
|
||||
import org.jclouds.http.handlers.DelegatingErrorHandler;
|
||||
import org.jclouds.http.handlers.DelegatingRetryHandler;
|
||||
import org.jclouds.logging.Logger;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
|
@ -51,10 +51,10 @@ public abstract class BaseHttpFutureCommandClient<Q> implements HttpFutureComman
|
|||
protected List<HttpRequestFilter> requestFilters = Collections.emptyList();
|
||||
|
||||
@Inject(optional = true)
|
||||
protected HttpErrorHandler httpErrorHandler = new CloseContentAndSetExceptionHandler();
|
||||
private HttpRetryHandler retryHandler = new DelegatingRetryHandler();
|
||||
|
||||
@Inject(optional = true)
|
||||
protected HttpRetryHandler httpRetryHandler = new BackoffLimitedRetryHandler(5);
|
||||
private HttpErrorHandler errorHandler = new DelegatingErrorHandler();
|
||||
|
||||
public void submit(HttpFutureCommand<?> command) {
|
||||
HttpRequest request = command.getRequest();
|
||||
|
@ -66,15 +66,22 @@ public abstract class BaseHttpFutureCommandClient<Q> implements HttpFutureComman
|
|||
}
|
||||
HttpResponse response = null;
|
||||
for (;;) {
|
||||
logger.trace("%1$s - converting request %2$s", request.getEndPoint(), request);
|
||||
logger.trace("%s - converting request %s", request.getEndPoint(), request);
|
||||
nativeRequest = convert(request);
|
||||
response = invoke(nativeRequest);
|
||||
int statusCode = response.getStatusCode();
|
||||
if (statusCode >= 500 && httpRetryHandler.shouldRetryRequest(command, response))
|
||||
continue;
|
||||
break;
|
||||
if (statusCode >= 300) {
|
||||
if (retryHandler.shouldRetryRequest(command, response)) {
|
||||
continue;
|
||||
} else {
|
||||
errorHandler.handleError(command, response);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
processResponse(response, command);
|
||||
break;
|
||||
}
|
||||
}
|
||||
handleResponse(command, response);
|
||||
} catch (Exception e) {
|
||||
command.setException(e);
|
||||
} finally {
|
||||
|
@ -88,14 +95,9 @@ public abstract class BaseHttpFutureCommandClient<Q> implements HttpFutureComman
|
|||
|
||||
protected abstract void cleanup(Q nativeResponse);
|
||||
|
||||
protected void handleResponse(HttpFutureCommand<?> command, HttpResponse response) {
|
||||
int code = response.getStatusCode();
|
||||
if (code >= 300) {
|
||||
httpErrorHandler.handle(command, response);
|
||||
} else {
|
||||
command.getResponseFuture().setResponse(response);
|
||||
command.getResponseFuture().run();
|
||||
}
|
||||
protected void processResponse(HttpResponse response, HttpFutureCommand<?> command) {
|
||||
command.getResponseFuture().setResponse(response);
|
||||
command.getResponseFuture().run();
|
||||
}
|
||||
|
||||
}
|
|
@ -23,6 +23,9 @@
|
|||
*/
|
||||
package org.jclouds.lifecycle;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -41,18 +44,23 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle {
|
|||
@Resource
|
||||
protected Logger logger = Logger.NULL;
|
||||
protected final ExecutorService executor;
|
||||
protected final BaseLifeCycle[] dependencies;
|
||||
protected final List<LifeCycle> dependencies;
|
||||
protected final Object statusLock;
|
||||
protected volatile Status status;
|
||||
protected AtomicReference<Exception> exception = new AtomicReference<Exception>();
|
||||
|
||||
public BaseLifeCycle(ExecutorService executor, BaseLifeCycle... dependencies) {
|
||||
public BaseLifeCycle(ExecutorService executor, LifeCycle... dependencies) {
|
||||
this.executor = executor;
|
||||
this.dependencies = dependencies;
|
||||
this.dependencies = new ArrayList<LifeCycle>();
|
||||
this.dependencies.addAll(Arrays.asList(dependencies));
|
||||
this.statusLock = new Object();
|
||||
this.status = Status.INACTIVE;
|
||||
}
|
||||
|
||||
public void addDependency(LifeCycle lifeCycle) {
|
||||
dependencies.add(lifeCycle);
|
||||
}
|
||||
|
||||
public Status getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
@ -116,14 +124,23 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle {
|
|||
}
|
||||
|
||||
protected void exceptionIfDepedenciesNotActive() {
|
||||
for (BaseLifeCycle dependency : dependencies) {
|
||||
if (dependency.status.compareTo(Status.ACTIVE) != 0) {
|
||||
throw new IllegalStateException(String.format(
|
||||
"Illegal state: %1$s for component: %2$s", dependency.status, dependency));
|
||||
for (LifeCycle dependency : dependencies) {
|
||||
if (dependency.getStatus().compareTo(Status.ACTIVE) != 0) {
|
||||
throw new IllegalStateException(String.format("Illegal state: %s for component: %s",
|
||||
dependency.getStatus(), dependency));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Exception getExceptionFromDependenciesOrNull() {
|
||||
for (LifeCycle dependency : dependencies) {
|
||||
if (dependency.getException() != null) {
|
||||
return dependency.getException();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public Exception getException() {
|
||||
return this.exception.get();
|
||||
}
|
||||
|
|
|
@ -81,10 +81,21 @@ public abstract class BaseHttpFutureCommandClientTest extends BaseJettyTest {
|
|||
assertEquals(get.get(10, TimeUnit.SECONDS).trim(), XML2);
|
||||
}
|
||||
|
||||
@Test(invocationCount = 50, timeOut = 3000)
|
||||
public void testGetStringPermanentRedirect() throws MalformedURLException, ExecutionException,
|
||||
InterruptedException, TimeoutException {
|
||||
// GetString get = factory.createGetString("/permanentredirect");
|
||||
// assert get != null;
|
||||
// client.submit(get);
|
||||
// assertEquals(get.get(10, TimeUnit.SECONDS).trim(), XML2);
|
||||
// TODO assert misses are only one, as permanent redirects paths should be remembered.
|
||||
}
|
||||
|
||||
@Test(invocationCount = 50, timeOut = 3000)
|
||||
public void testPutRedirect() throws MalformedURLException, ExecutionException,
|
||||
InterruptedException, TimeoutException {
|
||||
Put put = factory.createPut("/redirect", "foo");
|
||||
put.getRequest().getHeaders().put(HttpHeaders.CONTENT_LENGTH, "foo".getBytes().length + "");
|
||||
assert put != null;
|
||||
client.submit(put);
|
||||
assertEquals(put.get(10, TimeUnit.SECONDS), new Boolean(true));
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
package org.jclouds.http;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
@ -135,6 +136,7 @@ public abstract class BaseJettyTest {
|
|||
@Override
|
||||
protected void configure() {
|
||||
Names.bindProperties(binder(), properties);
|
||||
bind(URI.class).toInstance(URI.create("http://localhost:" + testPort));
|
||||
}
|
||||
}, new JDKLoggingModule(), new HttpCommandsModule(), createClientModule(),
|
||||
new AbstractModule() {
|
||||
|
|
Loading…
Reference in New Issue