Issue 1: resolved connection leak and hardened code related to connection handling

git-svn-id: http://jclouds.googlecode.com/svn/trunk@199 3d8758e0-26b5-11de-8745-db77d3ebf521
This commit is contained in:
adrian.f.cole 2009-05-02 20:04:12 +00:00
parent f16f866877
commit 7b10148328
22 changed files with 421 additions and 253 deletions

View File

@ -46,7 +46,7 @@ public class Logger {
logger.fine(String.format(message, args));
}
private boolean isDebugEnabled() {
public boolean isDebugEnabled() {
return logger.isLoggable(Level.FINE);
}

View File

@ -91,25 +91,25 @@ public class FutureCommand<Q, R, T> implements Future<T> {
*/
public static class ResponseRunnableFutureTask<R, T> extends FutureTask<T>
implements ResponseRunnableFuture<R, T> {
private final ResponseCallable<R, T> tCallable;
private final ResponseCallable<R, T> callable;
public ResponseRunnableFutureTask(ResponseCallable<R, T> tCallable) {
super(tCallable);
this.tCallable = tCallable;
this.callable = tCallable;
}
@Override
public String toString() {
return "ResponseRunnableFutureTask{" + "tCallable=" + tCallable
return getClass().getSimpleName()+"{" + "tCallable=" + callable
+ '}';
}
public R getResponse() {
return tCallable.getResponse();
return callable.getResponse();
}
public void setResponse(R response) {
tCallable.setResponse(response);
callable.setResponse(response);
}
/**

View File

@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* // TODO: Adrian: Document this!
*
*
* @author Adrian Cole
*/
public abstract class FutureCommandConnectionPool<C> extends BaseLifeCycle {
@ -46,73 +46,92 @@ public abstract class FutureCommandConnectionPool<C> extends BaseLifeCycle {
protected final FutureCommandConnectionRetry<C> futureCommandConnectionRetry;
protected volatile boolean hitBottom = false;
public FutureCommandConnectionPool(Logger logger, ExecutorService executor, FutureCommandConnectionRetry<C> futureCommandConnectionRetry, Semaphore allConnections, FutureCommandConnectionHandleFactory<C> futureCommandConnectionHandleFactory, @Named("maxConnectionReuse") int maxConnectionReuse, BlockingQueue<C> available, BaseLifeCycle... dependencies) {
super(logger, executor, dependencies);
this.futureCommandConnectionRetry = futureCommandConnectionRetry;
this.allConnections = allConnections;
this.futureCommandConnectionHandleFactory = futureCommandConnectionHandleFactory;
this.maxConnectionReuse = maxConnectionReuse;
this.available = available;
public FutureCommandConnectionPool(
Logger logger,
ExecutorService executor,
FutureCommandConnectionRetry<C> futureCommandConnectionRetry,
Semaphore allConnections,
FutureCommandConnectionHandleFactory<C> futureCommandConnectionHandleFactory,
@Named("maxConnectionReuse") int maxConnectionReuse,
BlockingQueue<C> available, BaseLifeCycle... dependencies) {
super(logger, executor, dependencies);
this.futureCommandConnectionRetry = futureCommandConnectionRetry;
this.allConnections = allConnections;
this.futureCommandConnectionHandleFactory = futureCommandConnectionHandleFactory;
this.maxConnectionReuse = maxConnectionReuse;
this.available = available;
}
@SuppressWarnings("unchecked")
protected void setResponseException(Exception ex, C conn) {
FutureCommand command = futureCommandConnectionRetry.getHandleFromConnection(conn).getOperation();
command.getResponseFuture().setException(ex);
FutureCommand command = futureCommandConnectionRetry
.getHandleFromConnection(conn).getOperation();
command.getResponseFuture().setException(ex);
}
@SuppressWarnings("unchecked")
protected void cancel(C conn) {
FutureCommand command = futureCommandConnectionRetry.getHandleFromConnection(conn).getOperation();
command.cancel(true);
FutureCommand command = futureCommandConnectionRetry
.getHandleFromConnection(conn).getOperation();
command.cancel(true);
}
@Provides
public C getConnection() throws InterruptedException, TimeoutException {
exceptionIfNotActive();
if (!hitBottom) {
hitBottom = available.size() == 0 && allConnections.availablePermits() == 0;
if (hitBottom)
logger.warn("%1s - saturated connection pool", this);
}
logger.debug("%1s - attempting to acquire connection; %d currently available", this, available.size());
C conn = available.poll(1, TimeUnit.SECONDS);
if (conn == null)
throw new TimeoutException("could not obtain a pooled connection within 1 seconds");
exceptionIfNotActive();
if (!hitBottom) {
hitBottom = available.size() == 0
&& allConnections.availablePermits() == 0;
if (hitBottom)
logger.warn("%1s - saturated connection pool", this);
}
logger
.debug(
"%1s - attempting to acquire connection; %d currently available",
this, available.size());
C conn = available.poll(1, TimeUnit.SECONDS);
if (conn == null)
throw new TimeoutException(
"could not obtain a pooled connection within 1 seconds");
logger.trace("%1s - %2d - aquired", conn, conn.hashCode());
if (connectionValid(conn)) {
logger.debug("%1s - %2d - reusing", conn, conn.hashCode());
return conn;
} else {
logger.debug("%1s - %2d - unusable", conn, conn.hashCode());
allConnections.release();
return getConnection();
}
logger.trace("%1s - %2d - aquired", conn, conn.hashCode());
if (connectionValid(conn)) {
logger.debug("%1s - %2d - reusing", conn, conn.hashCode());
return conn;
} else {
logger.debug("%1s - %2d - unusable", conn, conn.hashCode());
shutdownConnection(conn);
allConnections.release();
return getConnection();
}
}
protected void fatalException(Exception ex, C conn) {
setResponseException(ex, conn);
this.exception = ex;
allConnections.release();
shutdown();
setResponseException(ex, conn);
exception.set(ex);
shutdown();
}
protected abstract void shutdownConnection(C conn);
protected abstract boolean connectionValid(C conn);
public FutureCommandConnectionHandle<C> getHandle(FutureCommand<?,?,?> command) throws InterruptedException, TimeoutException {
exceptionIfNotActive();
C conn = getConnection();
FutureCommandConnectionHandle<C> handle = futureCommandConnectionHandleFactory.create(command, conn);
futureCommandConnectionRetry.associateHandleWithConnection(handle, conn);
return handle;
public FutureCommandConnectionHandle<C> getHandle(
FutureCommand<?, ?, ?> command) throws InterruptedException,
TimeoutException {
exceptionIfNotActive();
C conn = getConnection();
FutureCommandConnectionHandle<C> handle = futureCommandConnectionHandleFactory
.create(command, conn);
futureCommandConnectionRetry
.associateHandleWithConnection(handle, conn);
return handle;
}
protected abstract void createNewConnection() throws InterruptedException;
public interface FutureCommandConnectionHandleFactory<C> {
@SuppressWarnings("unchecked")
@SuppressWarnings("unchecked")
FutureCommandConnectionHandle<C> create(FutureCommand command, C conn);
}
}

View File

@ -65,14 +65,13 @@ public class FutureCommandConnectionPoolClient<C> extends BaseLifeCycle
@Override
protected void doShutdown() {
if (exception == null
&& futureCommandConnectionPool.getException() != null)
exception = futureCommandConnectionPool.getException();
exception.compareAndSet(null, futureCommandConnectionPool
.getException());
while (!commandQueue.isEmpty()) {
FutureCommand command = commandQueue.remove();
if (command != null) {
if (exception != null)
command.setException(exception);
if (exception.get() != null)
command.setException(exception.get());
else
command.cancel(true);
}

View File

@ -23,13 +23,13 @@
*/
package org.jclouds.command.pool;
import org.jclouds.Logger;
import org.jclouds.command.FutureCommand;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.jclouds.Logger;
import org.jclouds.command.FutureCommand;
/**
* // TODO: Adrian: Document this!
*
@ -50,17 +50,19 @@ public abstract class FutureCommandConnectionRetry<C> {
public abstract FutureCommandConnectionHandle<C> getHandleFromConnection(C connection);
public void shutdownConnectionAndRetryOperation(C connection) {
public boolean shutdownConnectionAndRetryOperation(C connection) {
FutureCommandConnectionHandle<C> handle = getHandleFromConnection(connection);
if (handle != null) {
try {
logger.info("%1s - shutting down connection", connection);
handle.shutdownConnection();
incrementErrorCountAndRetry(handle.getOperation());
return true;
} catch (IOException e) {
logger.error(e, "%1s - error shutting down connection", connection);
}
}
return false;
}
public void incrementErrorCountAndRetry(FutureCommand command) {

View File

@ -28,6 +28,7 @@ import org.jclouds.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
* // TODO: Adrian: Document this!
@ -40,7 +41,7 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle {
protected final BaseLifeCycle[] dependencies;
protected final Object statusLock;
protected volatile Status status;
protected Exception exception;
protected AtomicReference<Exception> exception = new AtomicReference<Exception>();
public BaseLifeCycle(Logger logger, ExecutorService executor, BaseLifeCycle... dependencies) {
this.logger = logger;
@ -61,7 +62,7 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle {
}
} catch (Exception e) {
logger.error(e, "Exception doing work");
this.exception = e;
exception.set(e);
}
this.status = Status.SHUTTING_DOWN;
doShutdown();
@ -79,7 +80,7 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle {
} catch (IllegalStateException e) {
return false;
}
return status.equals(Status.ACTIVE) && exception == null;
return status.equals(Status.ACTIVE) && exception.get() == null;
}
@PostConstruct
@ -117,7 +118,7 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle {
}
public Exception getException() {
return this.exception;
return this.exception.get();
}
protected void awaitShutdown(long timeout) throws InterruptedException {

View File

@ -29,6 +29,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@ -141,46 +143,49 @@ public abstract class BaseHttpFutureCommandClientTest {
@Test(invocationCount = 500, timeOut = 1500)
void testRequestFilter() throws MalformedURLException, ExecutionException,
InterruptedException {
InterruptedException, TimeoutException {
GetString get = factory.createGetString("/");
get.getRequest().getHeaders().put("filterme", "filterme");
client.submit(get);
assert get.get().trim().equals("test") : String.format(
"expected: [%1s], but got [%2s]", "test", get.get());
assert get.get(10, TimeUnit.SECONDS).trim().equals("test") : String
.format("expected: [%1s], but got [%2s]", "test", get.get(10,
TimeUnit.SECONDS));
}
@Test(invocationCount = 500, timeOut = 1500)
void testGetStringWithHeader() throws MalformedURLException,
ExecutionException, InterruptedException {
ExecutionException, InterruptedException, TimeoutException {
GetString get = factory.createGetString("/");
get.getRequest().getHeaders().put("test", "test");
client.submit(get);
assert get.get().trim().equals("test") : String.format(
"expected: [%1s], but got [%2s]", "test", get.get());
assert get.get(10, TimeUnit.SECONDS).trim().equals("test") : String
.format("expected: [%1s], but got [%2s]", "test", get.get(10,
TimeUnit.SECONDS));
}
@Test(invocationCount = 500, timeOut = 1500)
void testGetString() throws MalformedURLException, ExecutionException,
InterruptedException {
InterruptedException, TimeoutException {
GetString get = factory.createGetString("/");
assert get != null;
client.submit(get);
assert get.get().trim().equals(XML) : String.format(
"expected: [%1s], but got [%2s]", XML, get.get());
assert get.get(10, TimeUnit.SECONDS).trim().equals(XML) : String
.format("expected: [%1s], but got [%2s]", XML, get.get(10,
TimeUnit.SECONDS));
}
@Test(invocationCount = 500, timeOut = 1500)
void testHead() throws MalformedURLException, ExecutionException,
InterruptedException {
InterruptedException, TimeoutException {
Head head = factory.createHead("/");
assert head != null;
client.submit(head);
assert head.get();
assert head.get(10, TimeUnit.SECONDS);
}
@Test(invocationCount = 500, timeOut = 1500)
void testGetAndParseSax() throws MalformedURLException, ExecutionException,
InterruptedException {
InterruptedException, TimeoutException {
GetAndParseSax getAndParseSax = factory.createGetAndParseSax("/",
new ParseSax.HandlerWithResult<String>() {
@Override
@ -207,6 +212,6 @@ public abstract class BaseHttpFutureCommandClientTest {
});
assert getAndParseSax != null;
client.submit(getAndParseSax);
assert getAndParseSax.get().equals("whoppers");
assert getAndParseSax.get(10, TimeUnit.SECONDS).equals("whoppers");
}
}

View File

@ -23,10 +23,9 @@
*/
package org.jclouds.http.httpnio.config.internal;
import com.google.inject.*;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.FactoryProvider;
import com.google.inject.name.Named;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
@ -43,7 +42,12 @@ import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.*;
import org.apache.http.protocol.BasicHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.jclouds.command.pool.FutureCommandConnectionRetry;
import org.jclouds.command.pool.PoolConstants;
import org.jclouds.command.pool.config.FutureCommandConnectionPoolClientModule;
@ -52,83 +56,116 @@ import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionPool;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionRetry;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandExecutionHandler;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.FactoryProvider;
import com.google.inject.name.Named;
/**
* // TODO: Adrian: Document this!
*
*
* @author Adrian Cole
*/
public abstract class BaseHttpNioConnectionPoolClientModule extends FutureCommandConnectionPoolClientModule<NHttpConnection> {
public abstract class BaseHttpNioConnectionPoolClientModule extends
FutureCommandConnectionPoolClientModule<NHttpConnection> {
@Provides
@Singleton
public AsyncNHttpClientHandler provideAsyncNttpClientHandler(BasicHttpProcessor httpProcessor, NHttpRequestExecutionHandler execHandler, ConnectionReuseStrategy connStrategy, ByteBufferAllocator allocator, HttpParams params) {
return new AsyncNHttpClientHandler(httpProcessor, execHandler, connStrategy, allocator, params);
public AsyncNHttpClientHandler provideAsyncNttpClientHandler(
BasicHttpProcessor httpProcessor,
NHttpRequestExecutionHandler execHandler,
ConnectionReuseStrategy connStrategy,
ByteBufferAllocator allocator, HttpParams params) {
return new AsyncNHttpClientHandler(httpProcessor, execHandler,
connStrategy, allocator, params);
}
@Provides
@Singleton
public BasicHttpProcessor provideClientProcessor() {
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new RequestContent());
httpproc.addInterceptor(new RequestTargetHost());
httpproc.addInterceptor(new RequestConnControl());
httpproc.addInterceptor(new RequestUserAgent());
httpproc.addInterceptor(new RequestExpectContinue());
return httpproc;
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new RequestContent());
httpproc.addInterceptor(new RequestTargetHost());
httpproc.addInterceptor(new RequestConnControl());
httpproc.addInterceptor(new RequestUserAgent());
httpproc.addInterceptor(new RequestExpectContinue());
return httpproc;
}
@Provides
@Singleton
public HttpParams provideHttpParams() {
HttpParams params = new BasicHttpParams();
params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "jclouds/1.0");
return params;
HttpParams params = new BasicHttpParams();
params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE,
8 * 1024).setBooleanParameter(
CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "jclouds/1.0");
return params;
}
protected void configure() {
super.configure();
bind(HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class).toProvider(FactoryProvider.newFactory(HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class, InjectableBufferingNHttpEntity.class)).in(Scopes.SINGLETON);
bind(NHttpRequestExecutionHandler.class).to(HttpNioFutureCommandExecutionHandler.class).in(Scopes.SINGLETON);
bind(ConnectionReuseStrategy.class).to(DefaultConnectionReuseStrategy.class).in(Scopes.SINGLETON);
bind(ByteBufferAllocator.class).to(HeapByteBufferAllocator.class);
bind(FutureCommandConnectionRetry.class).to(HttpNioFutureCommandConnectionRetry.class);
bind(HttpNioFutureCommandConnectionPool.FutureCommandConnectionHandleFactory.class).toProvider(FactoryProvider.newFactory(new TypeLiteral<HttpNioFutureCommandConnectionPool.FutureCommandConnectionHandleFactory>() {
}, new TypeLiteral<HttpNioFutureCommandConnectionHandle>() {
}));
super.configure();
bind(
HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class)
.toProvider(
FactoryProvider
.newFactory(
HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class,
InjectableBufferingNHttpEntity.class))
.in(Scopes.SINGLETON);
bind(NHttpRequestExecutionHandler.class).to(
HttpNioFutureCommandExecutionHandler.class)
.in(Scopes.SINGLETON);
bind(ConnectionReuseStrategy.class).to(
DefaultConnectionReuseStrategy.class).in(Scopes.SINGLETON);
bind(ByteBufferAllocator.class).to(HeapByteBufferAllocator.class);
bind(FutureCommandConnectionRetry.class).to(
HttpNioFutureCommandConnectionRetry.class);
bind(
HttpNioFutureCommandConnectionPool.FutureCommandConnectionHandleFactory.class)
.toProvider(
FactoryProvider
.newFactory(
new TypeLiteral<HttpNioFutureCommandConnectionPool.FutureCommandConnectionHandleFactory>() {
},
new TypeLiteral<HttpNioFutureCommandConnectionHandle>() {
}));
}
static class InjectableBufferingNHttpEntity extends BufferingNHttpEntity {
@Inject
public InjectableBufferingNHttpEntity(@Assisted HttpEntity httpEntity, ByteBufferAllocator allocator) {
super(httpEntity, allocator);
}
@Inject
public InjectableBufferingNHttpEntity(@Assisted HttpEntity httpEntity,
ByteBufferAllocator allocator) {
super(httpEntity, allocator);
}
}
@Override
public BlockingQueue<NHttpConnection> provideAvailablePool(@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max) throws Exception {
return new ArrayBlockingQueue<NHttpConnection>(max, true);
public BlockingQueue<NHttpConnection> provideAvailablePool(
@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max)
throws Exception {
return new ArrayBlockingQueue<NHttpConnection>(max, true);
}
@Provides
@Singleton
public abstract IOEventDispatch provideClientEventDispatch(AsyncNHttpClientHandler handler, HttpParams params) throws Exception;
public abstract IOEventDispatch provideClientEventDispatch(
AsyncNHttpClientHandler handler, HttpParams params)
throws Exception;
@Provides
@Singleton
public DefaultConnectingIOReactor provideDefaultConnectingIOReactor(@Named(PoolConstants.PROPERTY_POOL_IO_WORKER_THREADS) int ioWorkerThreads, HttpParams params) throws IOReactorException {
return new DefaultConnectingIOReactor(ioWorkerThreads, params);
public DefaultConnectingIOReactor provideDefaultConnectingIOReactor(
@Named(PoolConstants.PROPERTY_POOL_IO_WORKER_THREADS) int ioWorkerThreads,
HttpParams params) throws IOReactorException {
return new DefaultConnectingIOReactor(ioWorkerThreads, params);
}
}

View File

@ -96,7 +96,7 @@ public class HttpNioFutureCommandConnectionPool extends
try {
ioReactor.execute(dispatch);
} catch (IOException e) {
exception = e;
exception.set(e);
logger.error(e, "Error dispatching %1s", dispatch);
status = Status.SHUTDOWN_REQUEST;
}
@ -115,21 +115,42 @@ public class HttpNioFutureCommandConnectionPool extends
}
}
@Override
public boolean connectionValid(NHttpConnection conn) {
return conn.isOpen() && !conn.isStale()
&& conn.getMetrics().getRequestCount() < maxConnectionReuse;
}
@Override
public void shutdownConnection(NHttpConnection conn) {
if (conn.getMetrics().getRequestCount() >= maxConnectionReuse)
logger.debug(
"%1s - %2d - closing connection due to overuse %1s/%2s",
conn, conn.hashCode(), conn.getMetrics().getRequestCount(),
maxConnectionReuse);
if (conn.getStatus() == NHttpConnection.ACTIVE) {
try {
conn.shutdown();
} catch (IOException e) {
logger.error(e, "Error shutting down connection");
}
}
}
@Override
protected void doWork() throws Exception {
createNewConnection();
}
@Override
protected void doShutdown() {
// Give the I/O reactor 10 sec to shut down
shutdownReactor(10000);
// Give the I/O reactor 1 sec to shut down
shutdownReactor(1000);
assert this.ioReactor.getStatus().equals(IOReactorStatus.SHUT_DOWN) : "incorrect status after io reactor shutdown :"
+ this.ioReactor.getStatus();
}
@Override
protected void createNewConnection() throws InterruptedException {
boolean acquired = allConnections.tryAcquire(1, TimeUnit.SECONDS);
if (acquired) {
@ -142,22 +163,18 @@ public class HttpNioFutureCommandConnectionPool extends
}
}
@Override
protected boolean shouldDoWork() {
return super.shouldDoWork()
&& ioReactor.getStatus().equals(IOReactorStatus.ACTIVE);
}
class NHttpClientConnectionPoolSessionRequestCallback implements
SessionRequestCallback {
public void completed(SessionRequest request) {
logger.trace("%1s - %2s - operation complete", request, request
logger.trace("%1s->%2s[%3s] - SessionRequest complete", request
.getLocalAddress(), request.getRemoteAddress(), request
.getAttachment());
}
public void cancelled(SessionRequest request) {
logger.info("%1s - %2s - operation cancelled", request, request
logger.trace("%1s->%2s[%3s] - SessionRequest cancelled", request
.getLocalAddress(), request.getRemoteAddress(), request
.getAttachment());
releaseConnectionAndCancelResponse(request);
}
@ -167,6 +184,9 @@ public class HttpNioFutureCommandConnectionPool extends
FutureCommand<?, ?, ?> frequest = (FutureCommand<?, ?, ?>) request
.getAttachment();
if (frequest != null) {
logger.error("%1s->%2s[%3s] - Cancelling FutureCommand",
request.getLocalAddress(), request.getRemoteAddress(),
frequest);
frequest.cancel(true);
}
}
@ -177,25 +197,37 @@ public class HttpNioFutureCommandConnectionPool extends
FutureCommand<?, ?, ?> frequest = (FutureCommand<?, ?, ?>) request
.getAttachment();
if (frequest != null) {
logger.error(e,
"%1s->%2s[%3s] - Setting Exception on FutureCommand",
request.getLocalAddress(), request.getRemoteAddress(),
frequest);
frequest.setException(e);
}
}
public void failed(SessionRequest request) {
int count = currentSessionFailures.getAndIncrement();
logger.error(request.getException(),
"%1s - %2s - operation failed", request, request
.getAttachment());
logger.warn("%1s->%2s[%3s] - SessionRequest failed", request
.getLocalAddress(), request.getRemoteAddress(), request
.getAttachment());
releaseConnectionAndSetResponseException(request, request
.getException());
if (count >= maxSessionFailures) {
exception = request.getException();
logger
.error(
request.getException(),
"%1s->%2s[%3s] - SessionRequest failures: %4s, Disabling pool for %5s",
request.getLocalAddress(), request
.getRemoteAddress(),
maxSessionFailures, target);
exception.set(request.getException());
}
}
public void timeout(SessionRequest request) {
logger.warn("%1s - %2s - operation timed out", request, request
logger.warn("%1s->%2s[%3s] - SessionRequest timeout", request
.getLocalAddress(), request.getRemoteAddress(), request
.getAttachment());
releaseConnectionAndCancelResponse(request);
}
@ -211,25 +243,31 @@ public class HttpNioFutureCommandConnectionPool extends
public void connectionTimeout(NHttpConnection conn) {
logger.warn("%1s - %2d - timeout %2d", conn, conn.hashCode(), conn
.getSocketTimeout());
allConnections.release();
futureCommandConnectionRetry.shutdownConnectionAndRetryOperation(conn);
}
public void connectionClosed(NHttpConnection conn) {
allConnections.release();
logger.trace("%1s - %2d - closed", conn, conn.hashCode());
}
public void fatalIOException(IOException ex, NHttpConnection conn) {
exception = ex;
logger.error(ex, "%1s - %2d - %3s - pool error", conn, conn.hashCode(),
exception.set(ex);
logger.error(ex, "%3s-%1d{%2s} - io error", conn, conn.hashCode(),
target);
futureCommandConnectionRetry.shutdownConnectionAndRetryOperation(conn);
if (!futureCommandConnectionRetry
.shutdownConnectionAndRetryOperation(conn))
try {
conn.shutdown();
} catch (IOException e) {
logger.error(e,
"%3s-%1d{%2s} - error shutting down connection", conn,
conn.hashCode(), target);
}
}
public void fatalProtocolException(HttpException ex, NHttpConnection conn) {
exception = ex;
logger.error(ex, "%1s - %2d - %3s - http error", conn, conn.hashCode(),
exception.set(ex);
logger.error(ex, "%3s-%1d{%2s} - http error", conn, conn.hashCode(),
target);
fatalException(ex, conn);
}

View File

@ -55,34 +55,34 @@ public class AmazonPerformance extends BasePerformance {
}
@Override
protected void testPutFileSerial() throws Exception {
public void testPutFileSerial() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void testPutFileParallel() throws InterruptedException,
public void testPutFileParallel() throws InterruptedException,
ExecutionException {
throw new UnsupportedOperationException();
}
@Override
protected void testPutInputStreamSerial() throws Exception {
public void testPutInputStreamSerial() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void testPutInputStreamParallel() throws InterruptedException,
public void testPutInputStreamParallel() throws InterruptedException,
ExecutionException {
throw new UnsupportedOperationException();
}
@Override
protected void testPutStringSerial() throws Exception {
public void testPutStringSerial() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void testPutStringParallel() throws InterruptedException,
public void testPutStringParallel() throws InterruptedException,
ExecutionException {
throw new UnsupportedOperationException();
}

View File

@ -25,6 +25,7 @@ package com.amazon.s3;
import java.io.File;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import org.jclouds.aws.s3.domain.S3Bucket;
@ -46,7 +47,8 @@ public abstract class BaseJCloudsPerformance extends BasePerformance {
// object.setContentType("application/octetstream");
// //object.setContent("this is a test");
// object.setContent(test);
// return clientProvider.getObject(s3Bucket, object.getKey()).get() !=
// return clientProvider.getObject(s3Bucket,
// object.getKey()).get(120,TimeUnit.SECONDS) !=
// org.jclouds.aws.s3.domain.S3Object.NOT_FOUND;
// }
@ -59,7 +61,7 @@ public abstract class BaseJCloudsPerformance extends BasePerformance {
key);
object.setContentType(contentType);
object.setContent(data);
return client.addObject(s3Bucket, object).get() != null;
return client.addObject(s3Bucket, object).get(120, TimeUnit.SECONDS) != null;
}
@Override
@ -70,7 +72,7 @@ public abstract class BaseJCloudsPerformance extends BasePerformance {
key);
object.setContentType(contentType);
object.setContent(data);
return client.addObject(s3Bucket, object).get() != null;
return client.addObject(s3Bucket, object).get(120, TimeUnit.SECONDS) != null;
}
@Override
@ -82,7 +84,7 @@ public abstract class BaseJCloudsPerformance extends BasePerformance {
object.setContentType(contentType);
object.setContent(data);
object.setSize(data.available());
return client.addObject(s3Bucket, object).get() != null;
return client.addObject(s3Bucket, object).get(120, TimeUnit.SECONDS) != null;
}
@Override
@ -93,6 +95,6 @@ public abstract class BaseJCloudsPerformance extends BasePerformance {
key);
object.setContentType(contentType);
object.setContent(data);
return client.addObject(s3Bucket, object).get() != null;
return client.addObject(s3Bucket, object).get(120, TimeUnit.SECONDS) != null;
}
}

View File

@ -32,6 +32,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.jclouds.aws.s3.S3Constants;
@ -51,6 +53,10 @@ import com.google.inject.Provider;
* @author Adrian Cole
*/
public abstract class BasePerformance extends S3IntegrationTest {
protected boolean debugEnabled() {
return false;
}
protected static int LOOP_COUNT = 100;
protected ExecutorService exec;
@ -85,7 +91,8 @@ public abstract class BasePerformance extends S3IntegrationTest {
@Optional String AWSSecretAccessKey) throws Exception {
super.setUpClient(AWSAccessKeyId, AWSSecretAccessKey);
for (String bucket : BUCKETS) {
client.createBucketIfNotExists(new S3Bucket(bucket)).get();
client.createBucketIfNotExists(new S3Bucket(bucket)).get(10,
TimeUnit.SECONDS);
}
}
@ -96,46 +103,46 @@ public abstract class BasePerformance extends S3IntegrationTest {
}
@Test(enabled = true)
protected void testPutBytesSerial() throws Exception {
public void testPutBytesSerial() throws Exception {
doSerial(putBytesCallable, LOOP_COUNT / 10);
}
@Test(enabled = true)
protected void testPutBytesParallel() throws InterruptedException,
ExecutionException {
public void testPutBytesParallel() throws InterruptedException,
ExecutionException, TimeoutException {
doParallel(putBytesCallable, LOOP_COUNT);
}
@Test(enabled = true)
protected void testPutFileSerial() throws Exception {
public void testPutFileSerial() throws Exception {
doSerial(putFileCallable, LOOP_COUNT / 10);
}
@Test(enabled = true)
protected void testPutFileParallel() throws InterruptedException,
ExecutionException {
public void testPutFileParallel() throws InterruptedException,
ExecutionException, TimeoutException {
doParallel(putFileCallable, LOOP_COUNT);
}
@Test(enabled = true)
protected void testPutInputStreamSerial() throws Exception {
public void testPutInputStreamSerial() throws Exception {
doSerial(putInputStreamCallable, LOOP_COUNT / 10);
}
@Test(enabled = true)
protected void testPutInputStreamParallel() throws InterruptedException,
ExecutionException {
public void testPutInputStreamParallel() throws InterruptedException,
ExecutionException, TimeoutException {
doParallel(putInputStreamCallable, LOOP_COUNT);
}
@Test(enabled = true)
protected void testPutStringSerial() throws Exception {
public void testPutStringSerial() throws Exception {
doSerial(putStringCallable, LOOP_COUNT / 10);
}
@Test(enabled = true)
protected void testPutStringParallel() throws InterruptedException,
ExecutionException {
public void testPutStringParallel() throws InterruptedException,
ExecutionException, TimeoutException {
doParallel(putStringCallable, LOOP_COUNT);
}
@ -146,11 +153,11 @@ public abstract class BasePerformance extends S3IntegrationTest {
}
private void doParallel(Provider<Callable<Boolean>> provider, int loopCount)
throws InterruptedException, ExecutionException {
throws InterruptedException, ExecutionException, TimeoutException {
for (int i = 0; i < loopCount; i++)
completer.submit(provider.get());
for (int i = 0; i < loopCount; i++)
assert completer.take().get();
assert completer.take().get(10, TimeUnit.SECONDS);
}
class PutBytesCallable implements Provider<Callable<Boolean>> {
@ -241,17 +248,20 @@ public abstract class BasePerformance extends S3IntegrationTest {
// }
//
// public Boolean call() throws Exception {
// bucket = clientProvider.get().getBucket(bucket).get();
// bucket =
// clientProvider.get(10,TimeUnit.SECONDS).getBucket(bucket).get(10,TimeUnit.SECONDS);
// List<Future<Boolean>> deletes = new ArrayList<Future<Boolean>>();
// for (org.jclouds.aws.s3.domain.S3Object object : bucket
// .getContents()) {
// deletes.add(clientProvider.get().deleteObject(bucket,
// deletes.add(clientProvider.get(10,TimeUnit.SECONDS).deleteObject(bucket,
// object.getKey()));
// }
// for (Future<Boolean> isdeleted : deletes)
// assert isdeleted.get() : String.format("failed to delete %1s",
// assert isdeleted.get(10,TimeUnit.SECONDS) :
// String.format("failed to delete %1s",
// isdeleted);
// return clientProvider.get().deleteBucket(bucket).get();
// return
// clientProvider.get(10,TimeUnit.SECONDS).deleteBucket(bucket).get(10,TimeUnit.SECONDS);
// }
// }
}

View File

@ -50,23 +50,23 @@ public class Jets3tPerformance extends BasePerformance {
}
@Override
protected void testPutStringSerial() throws Exception {
public void testPutStringSerial() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void testPutStringParallel() throws InterruptedException,
public void testPutStringParallel() throws InterruptedException,
ExecutionException {
throw new UnsupportedOperationException();
}
@Override
protected void testPutBytesSerial() throws Exception {
public void testPutBytesSerial() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void testPutBytesParallel() throws InterruptedException,
public void testPutBytesParallel() throws InterruptedException,
ExecutionException {
throw new UnsupportedOperationException();
}

View File

@ -35,4 +35,6 @@ public interface S3Constants extends HttpConstants, PoolConstants {
public static final String AUTH = "Authorization";
public static final String PROPERTY_AWS_SECRETACCESSKEY = "jclouds.aws.secretaccesskey";
public static final String PROPERTY_AWS_ACCESSKEYID = "jclouds.aws.accesskeyid";
public static final String PROPERTY_AWS_MAP_TIMEOUT = "jclouds.aws.map.timeout";
}

View File

@ -23,6 +23,18 @@
*/
package org.jclouds.aws.s3;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.aws.s3.S3Constants.PROPERTY_AWS_ACCESSKEYID;
import static org.jclouds.aws.s3.S3Constants.PROPERTY_AWS_SECRETACCESSKEY;
import static org.jclouds.command.pool.PoolConstants.PROPERTY_POOL_IO_WORKER_THREADS;
import static org.jclouds.command.pool.PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS;
import static org.jclouds.command.pool.PoolConstants.PROPERTY_POOL_MAX_CONNECTION_REUSE;
import static org.jclouds.command.pool.PoolConstants.PROPERTY_POOL_MAX_SESSION_FAILURES;
import static org.jclouds.command.pool.PoolConstants.PROPERTY_POOL_REQUEST_INVOKER_THREADS;
import static org.jclouds.http.HttpConstants.PROPERTY_HTTP_ADDRESS;
import static org.jclouds.http.HttpConstants.PROPERTY_HTTP_PORT;
import static org.jclouds.http.HttpConstants.PROPERTY_HTTP_SECURE;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -37,7 +49,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import static org.jclouds.aws.s3.S3Constants.*;
/**
* Creates {@link S3Context} or {@link Injector} instances based on the most
@ -166,7 +178,7 @@ public class S3ContextFactory {
return Guice.createInjector(new AbstractModule() {
@Override
protected void configure() {
Names.bindProperties(binder(), properties);
Names.bindProperties(binder(), checkNotNull(properties,"properties"));
for (Module module : modules)
install(module);
}

View File

@ -62,7 +62,8 @@ public class CopyObjectCallable extends
if (content != null) {
try {
response = Utils.toStringAndClose(content);
System.err.println("Copy response: " + response);
// TODO parse response of format: <CopyObjectResult
// xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><LastModified>2009-05-02T18:29:48.000Z</LastModified><ETag>&quot;29f1a7935898965c45f756e5f936fad2&quot;</ETag></CopyObjectResult>
} catch (IOException e) {
logger.error(e, "error consuming content");
}

View File

@ -37,9 +37,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jclouds.Utils;
import org.jclouds.aws.s3.S3Connection;
import org.jclouds.aws.s3.S3Constants;
import org.jclouds.aws.s3.S3Map;
import org.jclouds.aws.s3.S3Utils;
import org.jclouds.aws.s3.domain.S3Bucket;
@ -47,12 +50,20 @@ import org.jclouds.aws.s3.domain.S3Object;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.name.Named;
public abstract class BaseS3Map<T> implements Map<String, T>, S3Map {
protected final S3Connection connection;
protected final S3Bucket bucket;
/**
* maximum duration of an S3 Request
*/
@Inject(optional = true)
@Named(S3Constants.PROPERTY_AWS_MAP_TIMEOUT)
protected long requestTimeoutMilliseconds = 10000;
@Inject
public BaseS3Map(S3Connection connection, @Assisted S3Bucket bucket) {
this.connection = checkNotNull(connection, "connection");
@ -72,7 +83,7 @@ public abstract class BaseS3Map<T> implements Map<String, T>, S3Map {
}
protected boolean containsETag(String eTagOfValue)
throws InterruptedException, ExecutionException {
throws InterruptedException, ExecutionException, TimeoutException {
for (S3Object object : refreshBucket().getContents()) {
if (object.getETag().equals(eTagOfValue))
return true;
@ -81,7 +92,8 @@ public abstract class BaseS3Map<T> implements Map<String, T>, S3Map {
}
protected byte[] getMd5(Object value) throws IOException,
FileNotFoundException, InterruptedException, ExecutionException {
FileNotFoundException, InterruptedException, ExecutionException,
TimeoutException {
byte[] md5;
if (value instanceof InputStream) {
@ -94,7 +106,8 @@ public abstract class BaseS3Map<T> implements Map<String, T>, S3Map {
md5 = S3Utils.md5(new FileInputStream((File) value));
} else if (value instanceof S3Object) {
S3Object object = (S3Object) value;
object = connection.headObject(bucket, object.getKey()).get();
object = connection.headObject(bucket, object.getKey()).get(
requestTimeoutMilliseconds, TimeUnit.MILLISECONDS);
if (S3Object.NOT_FOUND.equals(object))
throw new FileNotFoundException("not found: " + object.getKey());
md5 = S3Utils.fromHexString(object.getETag());
@ -114,7 +127,8 @@ public abstract class BaseS3Map<T> implements Map<String, T>, S3Map {
for (Future<S3Object> futureObject : futureObjects) {
S3Object object = null;
try {
object = futureObject.get();
object = futureObject.get(requestTimeoutMilliseconds,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
Utils.<S3RuntimeException> rethrowIfRuntimeOrSameType(e);
throw new S3RuntimeException(String.format(
@ -164,7 +178,8 @@ public abstract class BaseS3Map<T> implements Map<String, T>, S3Map {
deletes.add(connection.deleteObject(bucket, key));
}
for (Future<Boolean> isdeleted : deletes)
if (!isdeleted.get()) {
if (!isdeleted.get(requestTimeoutMilliseconds,
TimeUnit.MILLISECONDS)) {
throw new S3RuntimeException("failed to delete entry");
}
} catch (Exception e) {
@ -174,8 +189,9 @@ public abstract class BaseS3Map<T> implements Map<String, T>, S3Map {
}
protected S3Bucket refreshBucket() throws InterruptedException,
ExecutionException {
S3Bucket currentBucket = connection.getBucket(bucket).get();
ExecutionException, TimeoutException {
S3Bucket currentBucket = connection.getBucket(bucket).get(
requestTimeoutMilliseconds, TimeUnit.MILLISECONDS);
if (currentBucket == S3Bucket.NOT_FOUND)
throw new S3RuntimeException("bucket not found: "
+ bucket.getName());
@ -198,7 +214,8 @@ public abstract class BaseS3Map<T> implements Map<String, T>, S3Map {
public boolean containsKey(Object key) {
try {
return connection.headObject(bucket, key.toString()).get() != S3Object.NOT_FOUND;
return connection.headObject(bucket, key.toString()).get(
requestTimeoutMilliseconds, TimeUnit.MILLISECONDS) != S3Object.NOT_FOUND;
} catch (Exception e) {
Utils.<S3RuntimeException> rethrowIfRuntimeOrSameType(e);
throw new S3RuntimeException(String.format(

View File

@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.jclouds.Utils;
@ -68,7 +69,8 @@ public class LiveS3InputStreamMap extends BaseS3Map<InputStream> implements
public InputStream get(Object o) {
try {
return (InputStream) (connection.getObject(bucket, o.toString())
.get()).getContent();
.get(requestTimeoutMilliseconds, TimeUnit.MILLISECONDS))
.getContent();
} catch (Exception e) {
Utils.<S3RuntimeException> rethrowIfRuntimeOrSameType(e);
throw new S3RuntimeException(String.format(
@ -84,7 +86,8 @@ public class LiveS3InputStreamMap extends BaseS3Map<InputStream> implements
public InputStream remove(Object o) {
InputStream old = get(o);
try {
connection.deleteObject(bucket, o.toString()).get();
connection.deleteObject(bucket, o.toString()).get(
requestTimeoutMilliseconds, TimeUnit.MILLISECONDS);
} catch (Exception e) {
Utils.<S3RuntimeException> rethrowIfRuntimeOrSameType(e);
throw new S3RuntimeException(String.format(
@ -151,7 +154,8 @@ public class LiveS3InputStreamMap extends BaseS3Map<InputStream> implements
InputStream returnVal = containsKey(s) ? get(s) : null;
object.setContent(o);
setSizeIfContentIsInputStream(object);
connection.addObject(bucket, object).get();
connection.addObject(bucket, object).get(
requestTimeoutMilliseconds, TimeUnit.MILLISECONDS);
return returnVal;
} catch (Exception e) {
Utils.<S3RuntimeException> rethrowIfRuntimeOrSameType(e);
@ -191,7 +195,8 @@ public class LiveS3InputStreamMap extends BaseS3Map<InputStream> implements
puts.add(connection.addObject(bucket, object));
}
for (Future<String> put : puts)
put.get();// this will throw an exception if there was a problem
// this will throw an exception if there was a problem
put.get(requestTimeoutMilliseconds, TimeUnit.MILLISECONDS);
} catch (Exception e) {
Utils.<S3RuntimeException> rethrowIfRuntimeOrSameType(e);
throw new S3RuntimeException("Error putting into bucket" + bucket,

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.jclouds.Utils;
import org.jclouds.aws.s3.S3Connection;
@ -87,7 +88,8 @@ public class LiveS3ObjectMap extends BaseS3Map<S3Object> implements S3ObjectMap
public S3Object get(Object key) {
try {
return connection.getObject(bucket, key.toString()).get();
return connection.getObject(bucket, key.toString()).get(
requestTimeoutMilliseconds, TimeUnit.MILLISECONDS);
} catch (Exception e) {
Utils.<S3RuntimeException> rethrowIfRuntimeOrSameType(e);
throw new S3RuntimeException(String.format(
@ -98,7 +100,8 @@ public class LiveS3ObjectMap extends BaseS3Map<S3Object> implements S3ObjectMap
public S3Object put(String key, S3Object value) {
S3Object returnVal = get(key);
try {
connection.addObject(bucket, value).get();
connection.addObject(bucket, value).get(requestTimeoutMilliseconds,
TimeUnit.MILLISECONDS);
} catch (Exception e) {
Utils.<S3RuntimeException> rethrowIfRuntimeOrSameType(e);
throw new S3RuntimeException(String.format(
@ -114,7 +117,8 @@ public class LiveS3ObjectMap extends BaseS3Map<S3Object> implements S3ObjectMap
puts.add(connection.addObject(bucket, object));
}
for (Future<String> put : puts)
put.get();// this will throw an exception if there was a problem
// this will throw an exception if there was a problem
put.get(requestTimeoutMilliseconds, TimeUnit.MILLISECONDS);
} catch (Exception e) {
Utils.<S3RuntimeException> rethrowIfRuntimeOrSameType(e);
throw new S3RuntimeException("Error putting into bucket" + bucket,
@ -125,7 +129,8 @@ public class LiveS3ObjectMap extends BaseS3Map<S3Object> implements S3ObjectMap
public S3Object remove(Object key) {
S3Object old = get(key);
try {
connection.deleteObject(bucket, key.toString()).get();
connection.deleteObject(bucket, key.toString()).get(
requestTimeoutMilliseconds, TimeUnit.MILLISECONDS);
} catch (Exception e) {
Utils.<S3RuntimeException> rethrowIfRuntimeOrSameType(e);
throw new S3RuntimeException(String.format(

View File

@ -30,6 +30,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.jclouds.aws.s3.domain.S3Bucket;
@ -43,12 +44,12 @@ public class AmazonS3Test extends S3IntegrationTest {
private String returnedString;
List<S3Bucket> listAllMyBuckets() throws Exception {
return client.getBuckets().get();
return client.getBuckets().get(10,TimeUnit.SECONDS);
}
S3Object getObject() throws Exception {
S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest");
return client.getObject(s3Bucket, "3366").get();
return client.getObject(s3Bucket, "3366").get(10,TimeUnit.SECONDS);
}
String putFileObject() throws Exception {
@ -56,7 +57,7 @@ public class AmazonS3Test extends S3IntegrationTest {
S3Object object = new S3Object("meat");
object.setContentType("text/xml");
object.setContent(new File("pom.xml"));
return client.addObject(s3Bucket, object).get();
return client.addObject(s3Bucket, object).get(10,TimeUnit.SECONDS);
}
@DataProvider(name = "putTests")
@ -75,17 +76,17 @@ public class AmazonS3Test extends S3IntegrationTest {
void testPutObject(String key, String type, Object content,
Object realObject) throws Exception {
S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "filetestsforadrian");
client.createBucketIfNotExists(s3Bucket).get();
client.createBucketIfNotExists(s3Bucket).get(10,TimeUnit.SECONDS);
context.createS3ObjectMap(s3Bucket).clear();
assertEquals(client.getBucket(s3Bucket).get().getContents().size(), 0);
assertEquals(client.getBucket(s3Bucket).get(10,TimeUnit.SECONDS).getContents().size(), 0);
S3Object object = new S3Object(key);
object.setContentType(type);
object.setContent(content);
assertNotNull(client.addObject(s3Bucket, object).get());
object = client.getObject(s3Bucket, object.getKey()).get();
assertNotNull(client.addObject(s3Bucket, object).get(10,TimeUnit.SECONDS));
object = client.getObject(s3Bucket, object.getKey()).get(10,TimeUnit.SECONDS);
returnedString = S3Utils.getContentAsStringAndClose(object);
assertEquals(returnedString, realObject);
assertEquals(client.getBucket(s3Bucket).get().getContents().size(), 1);
assertEquals(client.getBucket(s3Bucket).get(10,TimeUnit.SECONDS).getContents().size(), 1);
}
@Test
@ -93,38 +94,38 @@ public class AmazonS3Test extends S3IntegrationTest {
String realObject = IOUtils.toString(new FileInputStream("pom.xml"));
S3Bucket sourceBucket = new S3Bucket(bucketPrefix + "copysource");
client.createBucketIfNotExists(sourceBucket).get();
assertEquals(client.getBucket(sourceBucket).get().getContents().size(),
client.createBucketIfNotExists(sourceBucket).get(10,TimeUnit.SECONDS);
assertEquals(client.getBucket(sourceBucket).get(10,TimeUnit.SECONDS).getContents().size(),
0);
S3Object sourceObject = new S3Object("file");
sourceObject.setContentType("text/xml");
sourceObject.setContent(new File("pom.xml"));
client.addObject(sourceBucket, sourceObject).get();
assertEquals(client.getBucket(sourceBucket).get().getContents().size(),
client.addObject(sourceBucket, sourceObject).get(10,TimeUnit.SECONDS);
assertEquals(client.getBucket(sourceBucket).get(10,TimeUnit.SECONDS).getContents().size(),
1);
sourceObject = client.getObject(sourceBucket, sourceObject.getKey())
.get();
.get(10,TimeUnit.SECONDS);
assertEquals(S3Utils.getContentAsStringAndClose(sourceObject),
realObject);
S3Bucket destinationBucket = new S3Bucket(bucketPrefix
+ "copydestination");
client.createBucketIfNotExists(destinationBucket).get();
assertEquals(client.getBucket(destinationBucket).get().getContents()
client.createBucketIfNotExists(destinationBucket).get(10,TimeUnit.SECONDS);
assertEquals(client.getBucket(destinationBucket).get(10,TimeUnit.SECONDS).getContents()
.size(), 0);
S3Object destinationObject = new S3Object(sourceObject.getKey());
client.copyObject(sourceBucket, sourceObject, destinationBucket,
destinationObject).get();
assertEquals(client.getBucket(destinationBucket).get().getContents()
destinationObject).get(10,TimeUnit.SECONDS);
assertEquals(client.getBucket(destinationBucket).get(10,TimeUnit.SECONDS).getContents()
.size(), 1);
destinationObject = client.getObject(destinationBucket,
destinationObject.getKey()).get();
destinationObject.getKey()).get(10,TimeUnit.SECONDS);
assertEquals(S3Utils.getContentAsStringAndClose(destinationObject),
realObject);
@ -133,32 +134,32 @@ public class AmazonS3Test extends S3IntegrationTest {
S3Object headObject() throws Exception {
S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest");
return client.headObject(s3Bucket, "3366").get();
return client.headObject(s3Bucket, "3366").get(10,TimeUnit.SECONDS);
}
Boolean bucketExists() throws Exception {
S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest");
return client.bucketExists(s3Bucket).get();
return client.bucketExists(s3Bucket).get(10,TimeUnit.SECONDS);
}
Boolean deleteBucket() throws Exception {
S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest");
return client.deleteBucket(s3Bucket).get();
return client.deleteBucket(s3Bucket).get(10,TimeUnit.SECONDS);
}
Boolean deleteObject() throws Exception {
S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest");
return client.deleteObject(s3Bucket, "3366").get();
return client.deleteObject(s3Bucket, "3366").get(10,TimeUnit.SECONDS);
}
Boolean createBucketIfNotExists() throws Exception {
S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest");
return client.createBucketIfNotExists(s3Bucket).get();
return client.createBucketIfNotExists(s3Bucket).get(10,TimeUnit.SECONDS);
}
S3Bucket getBucket() throws Exception {
S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest");
return client.getBucket(s3Bucket).get();
return client.getBucket(s3Bucket).get(10,TimeUnit.SECONDS);
}
}

View File

@ -33,6 +33,8 @@ import java.io.InputStream;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.jclouds.aws.s3.domain.S3Bucket;
@ -68,7 +70,8 @@ public abstract class BaseS3MapTest<T> extends S3IntegrationTest {
@BeforeMethod
@Parameters( { "basedir" })
protected void setUpTempDir(String basedir) throws InterruptedException,
ExecutionException, FileNotFoundException, IOException {
ExecutionException, FileNotFoundException, IOException,
TimeoutException {
tmpDirectory = basedir + File.separator + "target" + File.separator
+ "testFiles" + File.separator + getClass().getSimpleName();
new File(tmpDirectory).mkdirs();
@ -88,7 +91,7 @@ public abstract class BaseS3MapTest<T> extends S3IntegrationTest {
.toInputStream("dogma"), "five", IOUtils
.toInputStream("emma"));
bucket = new S3Bucket(bucketPrefix + ".mimi");
client.createBucketIfNotExists(bucket).get();
client.createBucketIfNotExists(bucket).get(10, TimeUnit.SECONDS);
map = createMap(context, bucket);
map.clear();
}
@ -105,11 +108,11 @@ public abstract class BaseS3MapTest<T> extends S3IntegrationTest {
@Test
public void testClear() {
map.clear();
assertEquals(map.size(),0);
assertEquals(map.size(), 0);
putString("one", "apple");
assertEquals(map.size(),1);
assertEquals(map.size(), 1);
map.clear();
assertEquals(map.size(),0);
assertEquals(map.size(), 0);
}
@Test()

View File

@ -32,6 +32,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.ConsoleHandler;
import java.util.logging.Formatter;
import java.util.logging.Handler;
@ -53,29 +54,32 @@ import com.google.inject.Module;
@Test(sequential = true)
public class S3IntegrationTest {
@BeforeTest
void enableDebug() {
if (debugEnabled()) {
Handler HANDLER = new ConsoleHandler() {
{
setLevel(Level.ALL);
setFormatter(new Formatter() {
private static final Handler HANDLER = new ConsoleHandler() {
{
setLevel(Level.ALL);
setFormatter(new Formatter() {
@Override
public String format(LogRecord record) {
return String.format("[%tT %-7s] [%-7s] [%s]: %s %s\n",
new Date(record.getMillis()), record.getLevel(),
Thread.currentThread().getName(), record
.getLoggerName(), record.getMessage(),
record.getThrown() == null ? "" : record
.getThrown());
@Override
public String format(LogRecord record) {
return String.format(
"[%tT %-7s] [%-7s] [%s]: %s %s\n",
new Date(record.getMillis()), record
.getLevel(), Thread.currentThread()
.getName(), record.getLoggerName(),
record.getMessage(),
record.getThrown() == null ? "" : record
.getThrown());
}
});
}
});
};
Logger guiceLogger = Logger.getLogger("org.jclouds");
guiceLogger.addHandler(HANDLER);
guiceLogger.setLevel(Level.ALL);
}
};
static {
Logger guiceLogger = Logger.getLogger("org.jclouds");
guiceLogger.addHandler(HANDLER);
guiceLogger.setLevel(Level.ALL);
}
String badRequestWhenSourceIsDestBucketOnCopy400 = "<Error><Code>InvalidRequest</Code><Message>The Source and Destination may not be the same when the MetadataDirective is Copy.</Message><RequestId>54C77CAF4D42474B</RequestId><HostId>SJecknEUUUx88/65VAKbCdKSOCkpuVTeu7ZG9in9x9NTNglGnoxdbALCfS4k/DUZ</HostId></Error>";
@ -113,6 +117,10 @@ public class S3IntegrationTest {
deleteEverything();
}
protected boolean debugEnabled() {
return true;
}
protected S3Context createS3Context(String AWSAccessKeyId,
String AWSSecretAccessKey) {
return S3ContextFactory.createS3Context(buildS3Properties(
@ -129,7 +137,7 @@ public class S3IntegrationTest {
checkNotNull(AWSSecretAccessKey, "AWSSecretAccessKey"));
properties.setProperty(HttpConstants.PROPERTY_HTTP_SECURE, "false");
properties.setProperty(HttpConstants.PROPERTY_HTTP_PORT, "80");
// properties.setProperty("jclouds.http.sax.debug", "true");
// properties.setProperty("jclouds.http.sax.debug", "true");
return properties;
}
@ -139,21 +147,22 @@ public class S3IntegrationTest {
protected void deleteEverything() throws Exception {
try {
List<S3Bucket> buckets = client.getBuckets().get();
List<S3Bucket> buckets = client.getBuckets().get(10,
TimeUnit.SECONDS);
List<Future<Boolean>> results = new ArrayList<Future<Boolean>>();
for (S3Bucket bucket : buckets) {
if (bucket.getName().startsWith(bucketPrefix.toLowerCase())) {
bucket = client.getBucket(bucket).get();
bucket = client.getBucket(bucket).get(10, TimeUnit.SECONDS);
for (S3Object object : bucket.getContents()) {
results.add(client
.deleteObject(bucket, object.getKey()));
}
Iterator<Future<Boolean>> iterator = results.iterator();
while (iterator.hasNext()) {
iterator.next().get();
iterator.next().get(10, TimeUnit.SECONDS);
iterator.remove();
}
client.deleteBucket(bucket).get();
client.deleteBucket(bucket).get(10, TimeUnit.SECONDS);
}
}
@ -164,7 +173,7 @@ public class S3IntegrationTest {
@AfterTest
protected void tearDownClient() throws Exception {
// deleteEverything();
// deleteEverything();
context.close();
context = null;
}