From 7b101483285469b346685256defffe3f231a8ac7 Mon Sep 17 00:00:00 2001 From: "adrian.f.cole" Date: Sat, 2 May 2009 20:04:12 +0000 Subject: [PATCH] Issue 1: resolved connection leak and hardened code related to connection handling git-svn-id: http://jclouds.googlecode.com/svn/trunk@199 3d8758e0-26b5-11de-8745-db77d3ebf521 --- core/src/main/java/org/jclouds/Logger.java | 2 +- .../org/jclouds/command/FutureCommand.java | 10 +- .../pool/FutureCommandConnectionPool.java | 105 ++++++++------ .../FutureCommandConnectionPoolClient.java | 9 +- .../pool/FutureCommandConnectionRetry.java | 10 +- .../org/jclouds/lifecycle/BaseLifeCycle.java | 9 +- .../http/BaseHttpFutureCommandClientTest.java | 31 +++-- ...BaseHttpNioConnectionPoolClientModule.java | 131 +++++++++++------- .../HttpNioFutureCommandConnectionPool.java | 84 ++++++++--- .../java/com/amazon/s3/AmazonPerformance.java | 12 +- .../com/amazon/s3/BaseJCloudsPerformance.java | 12 +- .../java/com/amazon/s3/BasePerformance.java | 48 ++++--- .../java/com/amazon/s3/Jets3tPerformance.java | 8 +- .../java/org/jclouds/aws/s3/S3Constants.java | 2 + .../org/jclouds/aws/s3/S3ContextFactory.java | 16 ++- .../callables/CopyObjectCallable.java | 3 +- .../jclouds/aws/s3/internal/BaseS3Map.java | 33 +++-- .../aws/s3/internal/LiveS3InputStreamMap.java | 13 +- .../aws/s3/internal/LiveS3ObjectMap.java | 13 +- .../java/org/jclouds/aws/s3/AmazonS3Test.java | 49 +++---- .../org/jclouds/aws/s3/BaseS3MapTest.java | 13 +- .../org/jclouds/aws/s3/S3IntegrationTest.java | 61 ++++---- 22 files changed, 421 insertions(+), 253 deletions(-) diff --git a/core/src/main/java/org/jclouds/Logger.java b/core/src/main/java/org/jclouds/Logger.java index 8e8c755a91..3f0dfe6391 100644 --- a/core/src/main/java/org/jclouds/Logger.java +++ b/core/src/main/java/org/jclouds/Logger.java @@ -46,7 +46,7 @@ public class Logger { logger.fine(String.format(message, args)); } - private boolean isDebugEnabled() { + public boolean isDebugEnabled() { return logger.isLoggable(Level.FINE); } diff --git a/core/src/main/java/org/jclouds/command/FutureCommand.java b/core/src/main/java/org/jclouds/command/FutureCommand.java index 883c9dfb30..6e3f03053f 100644 --- a/core/src/main/java/org/jclouds/command/FutureCommand.java +++ b/core/src/main/java/org/jclouds/command/FutureCommand.java @@ -91,25 +91,25 @@ public class FutureCommand implements Future { */ public static class ResponseRunnableFutureTask extends FutureTask implements ResponseRunnableFuture { - private final ResponseCallable tCallable; + private final ResponseCallable callable; public ResponseRunnableFutureTask(ResponseCallable tCallable) { super(tCallable); - this.tCallable = tCallable; + this.callable = tCallable; } @Override public String toString() { - return "ResponseRunnableFutureTask{" + "tCallable=" + tCallable + return getClass().getSimpleName()+"{" + "tCallable=" + callable + '}'; } public R getResponse() { - return tCallable.getResponse(); + return callable.getResponse(); } public void setResponse(R response) { - tCallable.setResponse(response); + callable.setResponse(response); } /** diff --git a/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionPool.java b/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionPool.java index 1c0b74eb34..241863e720 100644 --- a/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionPool.java +++ b/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionPool.java @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * // TODO: Adrian: Document this! - * + * * @author Adrian Cole */ public abstract class FutureCommandConnectionPool extends BaseLifeCycle { @@ -46,73 +46,92 @@ public abstract class FutureCommandConnectionPool extends BaseLifeCycle { protected final FutureCommandConnectionRetry futureCommandConnectionRetry; protected volatile boolean hitBottom = false; - public FutureCommandConnectionPool(Logger logger, ExecutorService executor, FutureCommandConnectionRetry futureCommandConnectionRetry, Semaphore allConnections, FutureCommandConnectionHandleFactory futureCommandConnectionHandleFactory, @Named("maxConnectionReuse") int maxConnectionReuse, BlockingQueue available, BaseLifeCycle... dependencies) { - super(logger, executor, dependencies); - this.futureCommandConnectionRetry = futureCommandConnectionRetry; - this.allConnections = allConnections; - this.futureCommandConnectionHandleFactory = futureCommandConnectionHandleFactory; - this.maxConnectionReuse = maxConnectionReuse; - this.available = available; + public FutureCommandConnectionPool( + Logger logger, + ExecutorService executor, + FutureCommandConnectionRetry futureCommandConnectionRetry, + Semaphore allConnections, + FutureCommandConnectionHandleFactory futureCommandConnectionHandleFactory, + @Named("maxConnectionReuse") int maxConnectionReuse, + BlockingQueue available, BaseLifeCycle... dependencies) { + super(logger, executor, dependencies); + this.futureCommandConnectionRetry = futureCommandConnectionRetry; + this.allConnections = allConnections; + this.futureCommandConnectionHandleFactory = futureCommandConnectionHandleFactory; + this.maxConnectionReuse = maxConnectionReuse; + this.available = available; } @SuppressWarnings("unchecked") protected void setResponseException(Exception ex, C conn) { - FutureCommand command = futureCommandConnectionRetry.getHandleFromConnection(conn).getOperation(); - command.getResponseFuture().setException(ex); + FutureCommand command = futureCommandConnectionRetry + .getHandleFromConnection(conn).getOperation(); + command.getResponseFuture().setException(ex); } @SuppressWarnings("unchecked") protected void cancel(C conn) { - FutureCommand command = futureCommandConnectionRetry.getHandleFromConnection(conn).getOperation(); - command.cancel(true); + FutureCommand command = futureCommandConnectionRetry + .getHandleFromConnection(conn).getOperation(); + command.cancel(true); } - @Provides public C getConnection() throws InterruptedException, TimeoutException { - exceptionIfNotActive(); - if (!hitBottom) { - hitBottom = available.size() == 0 && allConnections.availablePermits() == 0; - if (hitBottom) - logger.warn("%1s - saturated connection pool", this); - } - logger.debug("%1s - attempting to acquire connection; %d currently available", this, available.size()); - C conn = available.poll(1, TimeUnit.SECONDS); - if (conn == null) - throw new TimeoutException("could not obtain a pooled connection within 1 seconds"); + exceptionIfNotActive(); + if (!hitBottom) { + hitBottom = available.size() == 0 + && allConnections.availablePermits() == 0; + if (hitBottom) + logger.warn("%1s - saturated connection pool", this); + } + logger + .debug( + "%1s - attempting to acquire connection; %d currently available", + this, available.size()); + C conn = available.poll(1, TimeUnit.SECONDS); + if (conn == null) + throw new TimeoutException( + "could not obtain a pooled connection within 1 seconds"); - logger.trace("%1s - %2d - aquired", conn, conn.hashCode()); - if (connectionValid(conn)) { - logger.debug("%1s - %2d - reusing", conn, conn.hashCode()); - return conn; - } else { - logger.debug("%1s - %2d - unusable", conn, conn.hashCode()); - allConnections.release(); - return getConnection(); - } + logger.trace("%1s - %2d - aquired", conn, conn.hashCode()); + if (connectionValid(conn)) { + logger.debug("%1s - %2d - reusing", conn, conn.hashCode()); + return conn; + } else { + logger.debug("%1s - %2d - unusable", conn, conn.hashCode()); + shutdownConnection(conn); + allConnections.release(); + return getConnection(); + } } protected void fatalException(Exception ex, C conn) { - setResponseException(ex, conn); - this.exception = ex; - allConnections.release(); - shutdown(); + setResponseException(ex, conn); + exception.set(ex); + shutdown(); } + protected abstract void shutdownConnection(C conn); + protected abstract boolean connectionValid(C conn); - public FutureCommandConnectionHandle getHandle(FutureCommand command) throws InterruptedException, TimeoutException { - exceptionIfNotActive(); - C conn = getConnection(); - FutureCommandConnectionHandle handle = futureCommandConnectionHandleFactory.create(command, conn); - futureCommandConnectionRetry.associateHandleWithConnection(handle, conn); - return handle; + public FutureCommandConnectionHandle getHandle( + FutureCommand command) throws InterruptedException, + TimeoutException { + exceptionIfNotActive(); + C conn = getConnection(); + FutureCommandConnectionHandle handle = futureCommandConnectionHandleFactory + .create(command, conn); + futureCommandConnectionRetry + .associateHandleWithConnection(handle, conn); + return handle; } protected abstract void createNewConnection() throws InterruptedException; public interface FutureCommandConnectionHandleFactory { - @SuppressWarnings("unchecked") + @SuppressWarnings("unchecked") FutureCommandConnectionHandle create(FutureCommand command, C conn); } } diff --git a/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionPoolClient.java b/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionPoolClient.java index 71ae7f346e..29c538144c 100644 --- a/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionPoolClient.java +++ b/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionPoolClient.java @@ -65,14 +65,13 @@ public class FutureCommandConnectionPoolClient extends BaseLifeCycle @Override protected void doShutdown() { - if (exception == null - && futureCommandConnectionPool.getException() != null) - exception = futureCommandConnectionPool.getException(); + exception.compareAndSet(null, futureCommandConnectionPool + .getException()); while (!commandQueue.isEmpty()) { FutureCommand command = commandQueue.remove(); if (command != null) { - if (exception != null) - command.setException(exception); + if (exception.get() != null) + command.setException(exception.get()); else command.cancel(true); } diff --git a/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionRetry.java b/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionRetry.java index 6212be4629..4b96f267fd 100644 --- a/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionRetry.java +++ b/core/src/main/java/org/jclouds/command/pool/FutureCommandConnectionRetry.java @@ -23,13 +23,13 @@ */ package org.jclouds.command.pool; -import org.jclouds.Logger; -import org.jclouds.command.FutureCommand; - import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import org.jclouds.Logger; +import org.jclouds.command.FutureCommand; + /** * // TODO: Adrian: Document this! * @@ -50,17 +50,19 @@ public abstract class FutureCommandConnectionRetry { public abstract FutureCommandConnectionHandle getHandleFromConnection(C connection); - public void shutdownConnectionAndRetryOperation(C connection) { + public boolean shutdownConnectionAndRetryOperation(C connection) { FutureCommandConnectionHandle handle = getHandleFromConnection(connection); if (handle != null) { try { logger.info("%1s - shutting down connection", connection); handle.shutdownConnection(); incrementErrorCountAndRetry(handle.getOperation()); + return true; } catch (IOException e) { logger.error(e, "%1s - error shutting down connection", connection); } } + return false; } public void incrementErrorCountAndRetry(FutureCommand command) { diff --git a/core/src/main/java/org/jclouds/lifecycle/BaseLifeCycle.java b/core/src/main/java/org/jclouds/lifecycle/BaseLifeCycle.java index 9460a31aca..5aa65771d3 100644 --- a/core/src/main/java/org/jclouds/lifecycle/BaseLifeCycle.java +++ b/core/src/main/java/org/jclouds/lifecycle/BaseLifeCycle.java @@ -28,6 +28,7 @@ import org.jclouds.Logger; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; /** * // TODO: Adrian: Document this! @@ -40,7 +41,7 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle { protected final BaseLifeCycle[] dependencies; protected final Object statusLock; protected volatile Status status; - protected Exception exception; + protected AtomicReference exception = new AtomicReference(); public BaseLifeCycle(Logger logger, ExecutorService executor, BaseLifeCycle... dependencies) { this.logger = logger; @@ -61,7 +62,7 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle { } } catch (Exception e) { logger.error(e, "Exception doing work"); - this.exception = e; + exception.set(e); } this.status = Status.SHUTTING_DOWN; doShutdown(); @@ -79,7 +80,7 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle { } catch (IllegalStateException e) { return false; } - return status.equals(Status.ACTIVE) && exception == null; + return status.equals(Status.ACTIVE) && exception.get() == null; } @PostConstruct @@ -117,7 +118,7 @@ public abstract class BaseLifeCycle implements Runnable, LifeCycle { } public Exception getException() { - return this.exception; + return this.exception.get(); } protected void awaitShutdown(long timeout) throws InterruptedException { diff --git a/core/src/test/java/org/jclouds/http/BaseHttpFutureCommandClientTest.java b/core/src/test/java/org/jclouds/http/BaseHttpFutureCommandClientTest.java index 572685c202..861c1db025 100644 --- a/core/src/test/java/org/jclouds/http/BaseHttpFutureCommandClientTest.java +++ b/core/src/test/java/org/jclouds/http/BaseHttpFutureCommandClientTest.java @@ -29,6 +29,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -141,46 +143,49 @@ public abstract class BaseHttpFutureCommandClientTest { @Test(invocationCount = 500, timeOut = 1500) void testRequestFilter() throws MalformedURLException, ExecutionException, - InterruptedException { + InterruptedException, TimeoutException { GetString get = factory.createGetString("/"); get.getRequest().getHeaders().put("filterme", "filterme"); client.submit(get); - assert get.get().trim().equals("test") : String.format( - "expected: [%1s], but got [%2s]", "test", get.get()); + assert get.get(10, TimeUnit.SECONDS).trim().equals("test") : String + .format("expected: [%1s], but got [%2s]", "test", get.get(10, + TimeUnit.SECONDS)); } @Test(invocationCount = 500, timeOut = 1500) void testGetStringWithHeader() throws MalformedURLException, - ExecutionException, InterruptedException { + ExecutionException, InterruptedException, TimeoutException { GetString get = factory.createGetString("/"); get.getRequest().getHeaders().put("test", "test"); client.submit(get); - assert get.get().trim().equals("test") : String.format( - "expected: [%1s], but got [%2s]", "test", get.get()); + assert get.get(10, TimeUnit.SECONDS).trim().equals("test") : String + .format("expected: [%1s], but got [%2s]", "test", get.get(10, + TimeUnit.SECONDS)); } @Test(invocationCount = 500, timeOut = 1500) void testGetString() throws MalformedURLException, ExecutionException, - InterruptedException { + InterruptedException, TimeoutException { GetString get = factory.createGetString("/"); assert get != null; client.submit(get); - assert get.get().trim().equals(XML) : String.format( - "expected: [%1s], but got [%2s]", XML, get.get()); + assert get.get(10, TimeUnit.SECONDS).trim().equals(XML) : String + .format("expected: [%1s], but got [%2s]", XML, get.get(10, + TimeUnit.SECONDS)); } @Test(invocationCount = 500, timeOut = 1500) void testHead() throws MalformedURLException, ExecutionException, - InterruptedException { + InterruptedException, TimeoutException { Head head = factory.createHead("/"); assert head != null; client.submit(head); - assert head.get(); + assert head.get(10, TimeUnit.SECONDS); } @Test(invocationCount = 500, timeOut = 1500) void testGetAndParseSax() throws MalformedURLException, ExecutionException, - InterruptedException { + InterruptedException, TimeoutException { GetAndParseSax getAndParseSax = factory.createGetAndParseSax("/", new ParseSax.HandlerWithResult() { @Override @@ -207,6 +212,6 @@ public abstract class BaseHttpFutureCommandClientTest { }); assert getAndParseSax != null; client.submit(getAndParseSax); - assert getAndParseSax.get().equals("whoppers"); + assert getAndParseSax.get(10, TimeUnit.SECONDS).equals("whoppers"); } } diff --git a/extensions/httpnio/src/main/java/org/jclouds/http/httpnio/config/internal/BaseHttpNioConnectionPoolClientModule.java b/extensions/httpnio/src/main/java/org/jclouds/http/httpnio/config/internal/BaseHttpNioConnectionPoolClientModule.java index 5935fbbc1f..99a1d12b1c 100644 --- a/extensions/httpnio/src/main/java/org/jclouds/http/httpnio/config/internal/BaseHttpNioConnectionPoolClientModule.java +++ b/extensions/httpnio/src/main/java/org/jclouds/http/httpnio/config/internal/BaseHttpNioConnectionPoolClientModule.java @@ -23,10 +23,9 @@ */ package org.jclouds.http.httpnio.config.internal; -import com.google.inject.*; -import com.google.inject.assistedinject.Assisted; -import com.google.inject.assistedinject.FactoryProvider; -import com.google.inject.name.Named; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + import org.apache.http.ConnectionReuseStrategy; import org.apache.http.HttpEntity; import org.apache.http.impl.DefaultConnectionReuseStrategy; @@ -43,7 +42,12 @@ import org.apache.http.params.BasicHttpParams; import org.apache.http.params.CoreConnectionPNames; import org.apache.http.params.CoreProtocolPNames; import org.apache.http.params.HttpParams; -import org.apache.http.protocol.*; +import org.apache.http.protocol.BasicHttpProcessor; +import org.apache.http.protocol.RequestConnControl; +import org.apache.http.protocol.RequestContent; +import org.apache.http.protocol.RequestExpectContinue; +import org.apache.http.protocol.RequestTargetHost; +import org.apache.http.protocol.RequestUserAgent; import org.jclouds.command.pool.FutureCommandConnectionRetry; import org.jclouds.command.pool.PoolConstants; import org.jclouds.command.pool.config.FutureCommandConnectionPoolClientModule; @@ -52,83 +56,116 @@ import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionPool; import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionRetry; import org.jclouds.http.httpnio.pool.HttpNioFutureCommandExecutionHandler; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import com.google.inject.Inject; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import com.google.inject.TypeLiteral; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.FactoryProvider; +import com.google.inject.name.Named; /** * // TODO: Adrian: Document this! - * + * * @author Adrian Cole */ -public abstract class BaseHttpNioConnectionPoolClientModule extends FutureCommandConnectionPoolClientModule { +public abstract class BaseHttpNioConnectionPoolClientModule extends + FutureCommandConnectionPoolClientModule { @Provides @Singleton - public AsyncNHttpClientHandler provideAsyncNttpClientHandler(BasicHttpProcessor httpProcessor, NHttpRequestExecutionHandler execHandler, ConnectionReuseStrategy connStrategy, ByteBufferAllocator allocator, HttpParams params) { - return new AsyncNHttpClientHandler(httpProcessor, execHandler, connStrategy, allocator, params); + public AsyncNHttpClientHandler provideAsyncNttpClientHandler( + BasicHttpProcessor httpProcessor, + NHttpRequestExecutionHandler execHandler, + ConnectionReuseStrategy connStrategy, + ByteBufferAllocator allocator, HttpParams params) { + return new AsyncNHttpClientHandler(httpProcessor, execHandler, + connStrategy, allocator, params); } @Provides @Singleton public BasicHttpProcessor provideClientProcessor() { - BasicHttpProcessor httpproc = new BasicHttpProcessor(); - httpproc.addInterceptor(new RequestContent()); - httpproc.addInterceptor(new RequestTargetHost()); - httpproc.addInterceptor(new RequestConnControl()); - httpproc.addInterceptor(new RequestUserAgent()); - httpproc.addInterceptor(new RequestExpectContinue()); - return httpproc; + BasicHttpProcessor httpproc = new BasicHttpProcessor(); + httpproc.addInterceptor(new RequestContent()); + httpproc.addInterceptor(new RequestTargetHost()); + httpproc.addInterceptor(new RequestConnControl()); + httpproc.addInterceptor(new RequestUserAgent()); + httpproc.addInterceptor(new RequestExpectContinue()); + return httpproc; } @Provides @Singleton public HttpParams provideHttpParams() { - HttpParams params = new BasicHttpParams(); - params - .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000) - .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) - .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false) - .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true) - .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "jclouds/1.0"); - return params; + HttpParams params = new BasicHttpParams(); + params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000) + .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, + 8 * 1024).setBooleanParameter( + CoreConnectionPNames.STALE_CONNECTION_CHECK, false) + .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true) + .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "jclouds/1.0"); + return params; } protected void configure() { - super.configure(); - bind(HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class).toProvider(FactoryProvider.newFactory(HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class, InjectableBufferingNHttpEntity.class)).in(Scopes.SINGLETON); - bind(NHttpRequestExecutionHandler.class).to(HttpNioFutureCommandExecutionHandler.class).in(Scopes.SINGLETON); - bind(ConnectionReuseStrategy.class).to(DefaultConnectionReuseStrategy.class).in(Scopes.SINGLETON); - bind(ByteBufferAllocator.class).to(HeapByteBufferAllocator.class); - bind(FutureCommandConnectionRetry.class).to(HttpNioFutureCommandConnectionRetry.class); - bind(HttpNioFutureCommandConnectionPool.FutureCommandConnectionHandleFactory.class).toProvider(FactoryProvider.newFactory(new TypeLiteral() { - }, new TypeLiteral() { - })); + super.configure(); + bind( + HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class) + .toProvider( + FactoryProvider + .newFactory( + HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class, + InjectableBufferingNHttpEntity.class)) + .in(Scopes.SINGLETON); + bind(NHttpRequestExecutionHandler.class).to( + HttpNioFutureCommandExecutionHandler.class) + .in(Scopes.SINGLETON); + bind(ConnectionReuseStrategy.class).to( + DefaultConnectionReuseStrategy.class).in(Scopes.SINGLETON); + bind(ByteBufferAllocator.class).to(HeapByteBufferAllocator.class); + bind(FutureCommandConnectionRetry.class).to( + HttpNioFutureCommandConnectionRetry.class); + bind( + HttpNioFutureCommandConnectionPool.FutureCommandConnectionHandleFactory.class) + .toProvider( + FactoryProvider + .newFactory( + new TypeLiteral() { + }, + new TypeLiteral() { + })); } - static class InjectableBufferingNHttpEntity extends BufferingNHttpEntity { - @Inject - public InjectableBufferingNHttpEntity(@Assisted HttpEntity httpEntity, ByteBufferAllocator allocator) { - super(httpEntity, allocator); - } + @Inject + public InjectableBufferingNHttpEntity(@Assisted HttpEntity httpEntity, + ByteBufferAllocator allocator) { + super(httpEntity, allocator); + } } @Override - public BlockingQueue provideAvailablePool(@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max) throws Exception { - return new ArrayBlockingQueue(max, true); + public BlockingQueue provideAvailablePool( + @Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max) + throws Exception { + return new ArrayBlockingQueue(max, true); } - @Provides @Singleton - public abstract IOEventDispatch provideClientEventDispatch(AsyncNHttpClientHandler handler, HttpParams params) throws Exception; + public abstract IOEventDispatch provideClientEventDispatch( + AsyncNHttpClientHandler handler, HttpParams params) + throws Exception; @Provides @Singleton - public DefaultConnectingIOReactor provideDefaultConnectingIOReactor(@Named(PoolConstants.PROPERTY_POOL_IO_WORKER_THREADS) int ioWorkerThreads, HttpParams params) throws IOReactorException { - return new DefaultConnectingIOReactor(ioWorkerThreads, params); + public DefaultConnectingIOReactor provideDefaultConnectingIOReactor( + @Named(PoolConstants.PROPERTY_POOL_IO_WORKER_THREADS) int ioWorkerThreads, + HttpParams params) throws IOReactorException { + return new DefaultConnectingIOReactor(ioWorkerThreads, params); } - } \ No newline at end of file diff --git a/extensions/httpnio/src/main/java/org/jclouds/http/httpnio/pool/HttpNioFutureCommandConnectionPool.java b/extensions/httpnio/src/main/java/org/jclouds/http/httpnio/pool/HttpNioFutureCommandConnectionPool.java index 0192c1897f..de61d13911 100644 --- a/extensions/httpnio/src/main/java/org/jclouds/http/httpnio/pool/HttpNioFutureCommandConnectionPool.java +++ b/extensions/httpnio/src/main/java/org/jclouds/http/httpnio/pool/HttpNioFutureCommandConnectionPool.java @@ -96,7 +96,7 @@ public class HttpNioFutureCommandConnectionPool extends try { ioReactor.execute(dispatch); } catch (IOException e) { - exception = e; + exception.set(e); logger.error(e, "Error dispatching %1s", dispatch); status = Status.SHUTDOWN_REQUEST; } @@ -115,21 +115,42 @@ public class HttpNioFutureCommandConnectionPool extends } } + @Override public boolean connectionValid(NHttpConnection conn) { return conn.isOpen() && !conn.isStale() && conn.getMetrics().getRequestCount() < maxConnectionReuse; } + @Override + public void shutdownConnection(NHttpConnection conn) { + if (conn.getMetrics().getRequestCount() >= maxConnectionReuse) + logger.debug( + "%1s - %2d - closing connection due to overuse %1s/%2s", + conn, conn.hashCode(), conn.getMetrics().getRequestCount(), + maxConnectionReuse); + if (conn.getStatus() == NHttpConnection.ACTIVE) { + try { + conn.shutdown(); + } catch (IOException e) { + logger.error(e, "Error shutting down connection"); + } + } + } + + @Override protected void doWork() throws Exception { createNewConnection(); } @Override protected void doShutdown() { - // Give the I/O reactor 10 sec to shut down - shutdownReactor(10000); + // Give the I/O reactor 1 sec to shut down + shutdownReactor(1000); + assert this.ioReactor.getStatus().equals(IOReactorStatus.SHUT_DOWN) : "incorrect status after io reactor shutdown :" + + this.ioReactor.getStatus(); } + @Override protected void createNewConnection() throws InterruptedException { boolean acquired = allConnections.tryAcquire(1, TimeUnit.SECONDS); if (acquired) { @@ -142,22 +163,18 @@ public class HttpNioFutureCommandConnectionPool extends } } - @Override - protected boolean shouldDoWork() { - return super.shouldDoWork() - && ioReactor.getStatus().equals(IOReactorStatus.ACTIVE); - } - class NHttpClientConnectionPoolSessionRequestCallback implements SessionRequestCallback { public void completed(SessionRequest request) { - logger.trace("%1s - %2s - operation complete", request, request + logger.trace("%1s->%2s[%3s] - SessionRequest complete", request + .getLocalAddress(), request.getRemoteAddress(), request .getAttachment()); } public void cancelled(SessionRequest request) { - logger.info("%1s - %2s - operation cancelled", request, request + logger.trace("%1s->%2s[%3s] - SessionRequest cancelled", request + .getLocalAddress(), request.getRemoteAddress(), request .getAttachment()); releaseConnectionAndCancelResponse(request); } @@ -167,6 +184,9 @@ public class HttpNioFutureCommandConnectionPool extends FutureCommand frequest = (FutureCommand) request .getAttachment(); if (frequest != null) { + logger.error("%1s->%2s[%3s] - Cancelling FutureCommand", + request.getLocalAddress(), request.getRemoteAddress(), + frequest); frequest.cancel(true); } } @@ -177,25 +197,37 @@ public class HttpNioFutureCommandConnectionPool extends FutureCommand frequest = (FutureCommand) request .getAttachment(); if (frequest != null) { + logger.error(e, + "%1s->%2s[%3s] - Setting Exception on FutureCommand", + request.getLocalAddress(), request.getRemoteAddress(), + frequest); frequest.setException(e); } } public void failed(SessionRequest request) { int count = currentSessionFailures.getAndIncrement(); - logger.error(request.getException(), - "%1s - %2s - operation failed", request, request - .getAttachment()); + logger.warn("%1s->%2s[%3s] - SessionRequest failed", request + .getLocalAddress(), request.getRemoteAddress(), request + .getAttachment()); releaseConnectionAndSetResponseException(request, request .getException()); if (count >= maxSessionFailures) { - exception = request.getException(); + logger + .error( + request.getException(), + "%1s->%2s[%3s] - SessionRequest failures: %4s, Disabling pool for %5s", + request.getLocalAddress(), request + .getRemoteAddress(), + maxSessionFailures, target); + exception.set(request.getException()); } } public void timeout(SessionRequest request) { - logger.warn("%1s - %2s - operation timed out", request, request + logger.warn("%1s->%2s[%3s] - SessionRequest timeout", request + .getLocalAddress(), request.getRemoteAddress(), request .getAttachment()); releaseConnectionAndCancelResponse(request); } @@ -211,25 +243,31 @@ public class HttpNioFutureCommandConnectionPool extends public void connectionTimeout(NHttpConnection conn) { logger.warn("%1s - %2d - timeout %2d", conn, conn.hashCode(), conn .getSocketTimeout()); - allConnections.release(); futureCommandConnectionRetry.shutdownConnectionAndRetryOperation(conn); } public void connectionClosed(NHttpConnection conn) { - allConnections.release(); logger.trace("%1s - %2d - closed", conn, conn.hashCode()); } public void fatalIOException(IOException ex, NHttpConnection conn) { - exception = ex; - logger.error(ex, "%1s - %2d - %3s - pool error", conn, conn.hashCode(), + exception.set(ex); + logger.error(ex, "%3s-%1d{%2s} - io error", conn, conn.hashCode(), target); - futureCommandConnectionRetry.shutdownConnectionAndRetryOperation(conn); + if (!futureCommandConnectionRetry + .shutdownConnectionAndRetryOperation(conn)) + try { + conn.shutdown(); + } catch (IOException e) { + logger.error(e, + "%3s-%1d{%2s} - error shutting down connection", conn, + conn.hashCode(), target); + } } public void fatalProtocolException(HttpException ex, NHttpConnection conn) { - exception = ex; - logger.error(ex, "%1s - %2d - %3s - http error", conn, conn.hashCode(), + exception.set(ex); + logger.error(ex, "%3s-%1d{%2s} - http error", conn, conn.hashCode(), target); fatalException(ex, conn); } diff --git a/s3/perftest/src/test/java/com/amazon/s3/AmazonPerformance.java b/s3/perftest/src/test/java/com/amazon/s3/AmazonPerformance.java index cd04270e22..3ced0fccab 100644 --- a/s3/perftest/src/test/java/com/amazon/s3/AmazonPerformance.java +++ b/s3/perftest/src/test/java/com/amazon/s3/AmazonPerformance.java @@ -55,34 +55,34 @@ public class AmazonPerformance extends BasePerformance { } @Override - protected void testPutFileSerial() throws Exception { + public void testPutFileSerial() throws Exception { throw new UnsupportedOperationException(); } @Override - protected void testPutFileParallel() throws InterruptedException, + public void testPutFileParallel() throws InterruptedException, ExecutionException { throw new UnsupportedOperationException(); } @Override - protected void testPutInputStreamSerial() throws Exception { + public void testPutInputStreamSerial() throws Exception { throw new UnsupportedOperationException(); } @Override - protected void testPutInputStreamParallel() throws InterruptedException, + public void testPutInputStreamParallel() throws InterruptedException, ExecutionException { throw new UnsupportedOperationException(); } @Override - protected void testPutStringSerial() throws Exception { + public void testPutStringSerial() throws Exception { throw new UnsupportedOperationException(); } @Override - protected void testPutStringParallel() throws InterruptedException, + public void testPutStringParallel() throws InterruptedException, ExecutionException { throw new UnsupportedOperationException(); } diff --git a/s3/perftest/src/test/java/com/amazon/s3/BaseJCloudsPerformance.java b/s3/perftest/src/test/java/com/amazon/s3/BaseJCloudsPerformance.java index da33e8a1db..4a814abb13 100644 --- a/s3/perftest/src/test/java/com/amazon/s3/BaseJCloudsPerformance.java +++ b/s3/perftest/src/test/java/com/amazon/s3/BaseJCloudsPerformance.java @@ -25,6 +25,7 @@ package com.amazon.s3; import java.io.File; import java.io.InputStream; +import java.util.concurrent.TimeUnit; import org.jclouds.aws.s3.domain.S3Bucket; @@ -46,7 +47,8 @@ public abstract class BaseJCloudsPerformance extends BasePerformance { // object.setContentType("application/octetstream"); // //object.setContent("this is a test"); // object.setContent(test); - // return clientProvider.getObject(s3Bucket, object.getKey()).get() != + // return clientProvider.getObject(s3Bucket, + // object.getKey()).get(120,TimeUnit.SECONDS) != // org.jclouds.aws.s3.domain.S3Object.NOT_FOUND; // } @@ -59,7 +61,7 @@ public abstract class BaseJCloudsPerformance extends BasePerformance { key); object.setContentType(contentType); object.setContent(data); - return client.addObject(s3Bucket, object).get() != null; + return client.addObject(s3Bucket, object).get(120, TimeUnit.SECONDS) != null; } @Override @@ -70,7 +72,7 @@ public abstract class BaseJCloudsPerformance extends BasePerformance { key); object.setContentType(contentType); object.setContent(data); - return client.addObject(s3Bucket, object).get() != null; + return client.addObject(s3Bucket, object).get(120, TimeUnit.SECONDS) != null; } @Override @@ -82,7 +84,7 @@ public abstract class BaseJCloudsPerformance extends BasePerformance { object.setContentType(contentType); object.setContent(data); object.setSize(data.available()); - return client.addObject(s3Bucket, object).get() != null; + return client.addObject(s3Bucket, object).get(120, TimeUnit.SECONDS) != null; } @Override @@ -93,6 +95,6 @@ public abstract class BaseJCloudsPerformance extends BasePerformance { key); object.setContentType(contentType); object.setContent(data); - return client.addObject(s3Bucket, object).get() != null; + return client.addObject(s3Bucket, object).get(120, TimeUnit.SECONDS) != null; } } diff --git a/s3/perftest/src/test/java/com/amazon/s3/BasePerformance.java b/s3/perftest/src/test/java/com/amazon/s3/BasePerformance.java index 84e5e18d08..522f5e33f7 100644 --- a/s3/perftest/src/test/java/com/amazon/s3/BasePerformance.java +++ b/s3/perftest/src/test/java/com/amazon/s3/BasePerformance.java @@ -32,6 +32,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.jclouds.aws.s3.S3Constants; @@ -51,6 +53,10 @@ import com.google.inject.Provider; * @author Adrian Cole */ public abstract class BasePerformance extends S3IntegrationTest { + protected boolean debugEnabled() { + return false; + } + protected static int LOOP_COUNT = 100; protected ExecutorService exec; @@ -85,7 +91,8 @@ public abstract class BasePerformance extends S3IntegrationTest { @Optional String AWSSecretAccessKey) throws Exception { super.setUpClient(AWSAccessKeyId, AWSSecretAccessKey); for (String bucket : BUCKETS) { - client.createBucketIfNotExists(new S3Bucket(bucket)).get(); + client.createBucketIfNotExists(new S3Bucket(bucket)).get(10, + TimeUnit.SECONDS); } } @@ -96,46 +103,46 @@ public abstract class BasePerformance extends S3IntegrationTest { } @Test(enabled = true) - protected void testPutBytesSerial() throws Exception { + public void testPutBytesSerial() throws Exception { doSerial(putBytesCallable, LOOP_COUNT / 10); } @Test(enabled = true) - protected void testPutBytesParallel() throws InterruptedException, - ExecutionException { + public void testPutBytesParallel() throws InterruptedException, + ExecutionException, TimeoutException { doParallel(putBytesCallable, LOOP_COUNT); } @Test(enabled = true) - protected void testPutFileSerial() throws Exception { + public void testPutFileSerial() throws Exception { doSerial(putFileCallable, LOOP_COUNT / 10); } @Test(enabled = true) - protected void testPutFileParallel() throws InterruptedException, - ExecutionException { + public void testPutFileParallel() throws InterruptedException, + ExecutionException, TimeoutException { doParallel(putFileCallable, LOOP_COUNT); } @Test(enabled = true) - protected void testPutInputStreamSerial() throws Exception { + public void testPutInputStreamSerial() throws Exception { doSerial(putInputStreamCallable, LOOP_COUNT / 10); } @Test(enabled = true) - protected void testPutInputStreamParallel() throws InterruptedException, - ExecutionException { + public void testPutInputStreamParallel() throws InterruptedException, + ExecutionException, TimeoutException { doParallel(putInputStreamCallable, LOOP_COUNT); } @Test(enabled = true) - protected void testPutStringSerial() throws Exception { + public void testPutStringSerial() throws Exception { doSerial(putStringCallable, LOOP_COUNT / 10); } @Test(enabled = true) - protected void testPutStringParallel() throws InterruptedException, - ExecutionException { + public void testPutStringParallel() throws InterruptedException, + ExecutionException, TimeoutException { doParallel(putStringCallable, LOOP_COUNT); } @@ -146,11 +153,11 @@ public abstract class BasePerformance extends S3IntegrationTest { } private void doParallel(Provider> provider, int loopCount) - throws InterruptedException, ExecutionException { + throws InterruptedException, ExecutionException, TimeoutException { for (int i = 0; i < loopCount; i++) completer.submit(provider.get()); for (int i = 0; i < loopCount; i++) - assert completer.take().get(); + assert completer.take().get(10, TimeUnit.SECONDS); } class PutBytesCallable implements Provider> { @@ -241,17 +248,20 @@ public abstract class BasePerformance extends S3IntegrationTest { // } // // public Boolean call() throws Exception { - // bucket = clientProvider.get().getBucket(bucket).get(); + // bucket = + // clientProvider.get(10,TimeUnit.SECONDS).getBucket(bucket).get(10,TimeUnit.SECONDS); // List> deletes = new ArrayList>(); // for (org.jclouds.aws.s3.domain.S3Object object : bucket // .getContents()) { - // deletes.add(clientProvider.get().deleteObject(bucket, + // deletes.add(clientProvider.get(10,TimeUnit.SECONDS).deleteObject(bucket, // object.getKey())); // } // for (Future isdeleted : deletes) - // assert isdeleted.get() : String.format("failed to delete %1s", + // assert isdeleted.get(10,TimeUnit.SECONDS) : + // String.format("failed to delete %1s", // isdeleted); - // return clientProvider.get().deleteBucket(bucket).get(); + // return + // clientProvider.get(10,TimeUnit.SECONDS).deleteBucket(bucket).get(10,TimeUnit.SECONDS); // } // } } diff --git a/s3/perftest/src/test/java/com/amazon/s3/Jets3tPerformance.java b/s3/perftest/src/test/java/com/amazon/s3/Jets3tPerformance.java index 4587d49ae9..f43b059f55 100644 --- a/s3/perftest/src/test/java/com/amazon/s3/Jets3tPerformance.java +++ b/s3/perftest/src/test/java/com/amazon/s3/Jets3tPerformance.java @@ -50,23 +50,23 @@ public class Jets3tPerformance extends BasePerformance { } @Override - protected void testPutStringSerial() throws Exception { + public void testPutStringSerial() throws Exception { throw new UnsupportedOperationException(); } @Override - protected void testPutStringParallel() throws InterruptedException, + public void testPutStringParallel() throws InterruptedException, ExecutionException { throw new UnsupportedOperationException(); } @Override - protected void testPutBytesSerial() throws Exception { + public void testPutBytesSerial() throws Exception { throw new UnsupportedOperationException(); } @Override - protected void testPutBytesParallel() throws InterruptedException, + public void testPutBytesParallel() throws InterruptedException, ExecutionException { throw new UnsupportedOperationException(); } diff --git a/s3/src/main/java/org/jclouds/aws/s3/S3Constants.java b/s3/src/main/java/org/jclouds/aws/s3/S3Constants.java index e9cc371499..281c65a6dc 100644 --- a/s3/src/main/java/org/jclouds/aws/s3/S3Constants.java +++ b/s3/src/main/java/org/jclouds/aws/s3/S3Constants.java @@ -35,4 +35,6 @@ public interface S3Constants extends HttpConstants, PoolConstants { public static final String AUTH = "Authorization"; public static final String PROPERTY_AWS_SECRETACCESSKEY = "jclouds.aws.secretaccesskey"; public static final String PROPERTY_AWS_ACCESSKEYID = "jclouds.aws.accesskeyid"; + public static final String PROPERTY_AWS_MAP_TIMEOUT = "jclouds.aws.map.timeout"; + } diff --git a/s3/src/main/java/org/jclouds/aws/s3/S3ContextFactory.java b/s3/src/main/java/org/jclouds/aws/s3/S3ContextFactory.java index 2ad8d4a996..6c4ee48d78 100644 --- a/s3/src/main/java/org/jclouds/aws/s3/S3ContextFactory.java +++ b/s3/src/main/java/org/jclouds/aws/s3/S3ContextFactory.java @@ -23,6 +23,18 @@ */ package org.jclouds.aws.s3; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.jclouds.aws.s3.S3Constants.PROPERTY_AWS_ACCESSKEYID; +import static org.jclouds.aws.s3.S3Constants.PROPERTY_AWS_SECRETACCESSKEY; +import static org.jclouds.command.pool.PoolConstants.PROPERTY_POOL_IO_WORKER_THREADS; +import static org.jclouds.command.pool.PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS; +import static org.jclouds.command.pool.PoolConstants.PROPERTY_POOL_MAX_CONNECTION_REUSE; +import static org.jclouds.command.pool.PoolConstants.PROPERTY_POOL_MAX_SESSION_FAILURES; +import static org.jclouds.command.pool.PoolConstants.PROPERTY_POOL_REQUEST_INVOKER_THREADS; +import static org.jclouds.http.HttpConstants.PROPERTY_HTTP_ADDRESS; +import static org.jclouds.http.HttpConstants.PROPERTY_HTTP_PORT; +import static org.jclouds.http.HttpConstants.PROPERTY_HTTP_SECURE; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -37,7 +49,7 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Module; import com.google.inject.name.Names; -import static org.jclouds.aws.s3.S3Constants.*; + /** * Creates {@link S3Context} or {@link Injector} instances based on the most @@ -166,7 +178,7 @@ public class S3ContextFactory { return Guice.createInjector(new AbstractModule() { @Override protected void configure() { - Names.bindProperties(binder(), properties); + Names.bindProperties(binder(), checkNotNull(properties,"properties")); for (Module module : modules) install(module); } diff --git a/s3/src/main/java/org/jclouds/aws/s3/commands/callables/CopyObjectCallable.java b/s3/src/main/java/org/jclouds/aws/s3/commands/callables/CopyObjectCallable.java index 409ae5a8bf..90459c12ca 100644 --- a/s3/src/main/java/org/jclouds/aws/s3/commands/callables/CopyObjectCallable.java +++ b/s3/src/main/java/org/jclouds/aws/s3/commands/callables/CopyObjectCallable.java @@ -62,7 +62,8 @@ public class CopyObjectCallable extends if (content != null) { try { response = Utils.toStringAndClose(content); - System.err.println("Copy response: " + response); + // TODO parse response of format: 2009-05-02T18:29:48.000Z"29f1a7935898965c45f756e5f936fad2" } catch (IOException e) { logger.error(e, "error consuming content"); } diff --git a/s3/src/main/java/org/jclouds/aws/s3/internal/BaseS3Map.java b/s3/src/main/java/org/jclouds/aws/s3/internal/BaseS3Map.java index 2e1d05bc0e..9be4c013c4 100644 --- a/s3/src/main/java/org/jclouds/aws/s3/internal/BaseS3Map.java +++ b/s3/src/main/java/org/jclouds/aws/s3/internal/BaseS3Map.java @@ -37,9 +37,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.jclouds.Utils; import org.jclouds.aws.s3.S3Connection; +import org.jclouds.aws.s3.S3Constants; import org.jclouds.aws.s3.S3Map; import org.jclouds.aws.s3.S3Utils; import org.jclouds.aws.s3.domain.S3Bucket; @@ -47,12 +50,20 @@ import org.jclouds.aws.s3.domain.S3Object; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; +import com.google.inject.name.Named; public abstract class BaseS3Map implements Map, S3Map { protected final S3Connection connection; protected final S3Bucket bucket; + /** + * maximum duration of an S3 Request + */ + @Inject(optional = true) + @Named(S3Constants.PROPERTY_AWS_MAP_TIMEOUT) + protected long requestTimeoutMilliseconds = 10000; + @Inject public BaseS3Map(S3Connection connection, @Assisted S3Bucket bucket) { this.connection = checkNotNull(connection, "connection"); @@ -72,7 +83,7 @@ public abstract class BaseS3Map implements Map, S3Map { } protected boolean containsETag(String eTagOfValue) - throws InterruptedException, ExecutionException { + throws InterruptedException, ExecutionException, TimeoutException { for (S3Object object : refreshBucket().getContents()) { if (object.getETag().equals(eTagOfValue)) return true; @@ -81,7 +92,8 @@ public abstract class BaseS3Map implements Map, S3Map { } protected byte[] getMd5(Object value) throws IOException, - FileNotFoundException, InterruptedException, ExecutionException { + FileNotFoundException, InterruptedException, ExecutionException, + TimeoutException { byte[] md5; if (value instanceof InputStream) { @@ -94,7 +106,8 @@ public abstract class BaseS3Map implements Map, S3Map { md5 = S3Utils.md5(new FileInputStream((File) value)); } else if (value instanceof S3Object) { S3Object object = (S3Object) value; - object = connection.headObject(bucket, object.getKey()).get(); + object = connection.headObject(bucket, object.getKey()).get( + requestTimeoutMilliseconds, TimeUnit.MILLISECONDS); if (S3Object.NOT_FOUND.equals(object)) throw new FileNotFoundException("not found: " + object.getKey()); md5 = S3Utils.fromHexString(object.getETag()); @@ -114,7 +127,8 @@ public abstract class BaseS3Map implements Map, S3Map { for (Future futureObject : futureObjects) { S3Object object = null; try { - object = futureObject.get(); + object = futureObject.get(requestTimeoutMilliseconds, + TimeUnit.MILLISECONDS); } catch (Exception e) { Utils. rethrowIfRuntimeOrSameType(e); throw new S3RuntimeException(String.format( @@ -164,7 +178,8 @@ public abstract class BaseS3Map implements Map, S3Map { deletes.add(connection.deleteObject(bucket, key)); } for (Future isdeleted : deletes) - if (!isdeleted.get()) { + if (!isdeleted.get(requestTimeoutMilliseconds, + TimeUnit.MILLISECONDS)) { throw new S3RuntimeException("failed to delete entry"); } } catch (Exception e) { @@ -174,8 +189,9 @@ public abstract class BaseS3Map implements Map, S3Map { } protected S3Bucket refreshBucket() throws InterruptedException, - ExecutionException { - S3Bucket currentBucket = connection.getBucket(bucket).get(); + ExecutionException, TimeoutException { + S3Bucket currentBucket = connection.getBucket(bucket).get( + requestTimeoutMilliseconds, TimeUnit.MILLISECONDS); if (currentBucket == S3Bucket.NOT_FOUND) throw new S3RuntimeException("bucket not found: " + bucket.getName()); @@ -198,7 +214,8 @@ public abstract class BaseS3Map implements Map, S3Map { public boolean containsKey(Object key) { try { - return connection.headObject(bucket, key.toString()).get() != S3Object.NOT_FOUND; + return connection.headObject(bucket, key.toString()).get( + requestTimeoutMilliseconds, TimeUnit.MILLISECONDS) != S3Object.NOT_FOUND; } catch (Exception e) { Utils. rethrowIfRuntimeOrSameType(e); throw new S3RuntimeException(String.format( diff --git a/s3/src/main/java/org/jclouds/aws/s3/internal/LiveS3InputStreamMap.java b/s3/src/main/java/org/jclouds/aws/s3/internal/LiveS3InputStreamMap.java index d5a18361da..3ad09980e1 100644 --- a/s3/src/main/java/org/jclouds/aws/s3/internal/LiveS3InputStreamMap.java +++ b/s3/src/main/java/org/jclouds/aws/s3/internal/LiveS3InputStreamMap.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.jclouds.Utils; @@ -68,7 +69,8 @@ public class LiveS3InputStreamMap extends BaseS3Map implements public InputStream get(Object o) { try { return (InputStream) (connection.getObject(bucket, o.toString()) - .get()).getContent(); + .get(requestTimeoutMilliseconds, TimeUnit.MILLISECONDS)) + .getContent(); } catch (Exception e) { Utils. rethrowIfRuntimeOrSameType(e); throw new S3RuntimeException(String.format( @@ -84,7 +86,8 @@ public class LiveS3InputStreamMap extends BaseS3Map implements public InputStream remove(Object o) { InputStream old = get(o); try { - connection.deleteObject(bucket, o.toString()).get(); + connection.deleteObject(bucket, o.toString()).get( + requestTimeoutMilliseconds, TimeUnit.MILLISECONDS); } catch (Exception e) { Utils. rethrowIfRuntimeOrSameType(e); throw new S3RuntimeException(String.format( @@ -151,7 +154,8 @@ public class LiveS3InputStreamMap extends BaseS3Map implements InputStream returnVal = containsKey(s) ? get(s) : null; object.setContent(o); setSizeIfContentIsInputStream(object); - connection.addObject(bucket, object).get(); + connection.addObject(bucket, object).get( + requestTimeoutMilliseconds, TimeUnit.MILLISECONDS); return returnVal; } catch (Exception e) { Utils. rethrowIfRuntimeOrSameType(e); @@ -191,7 +195,8 @@ public class LiveS3InputStreamMap extends BaseS3Map implements puts.add(connection.addObject(bucket, object)); } for (Future put : puts) - put.get();// this will throw an exception if there was a problem + // this will throw an exception if there was a problem + put.get(requestTimeoutMilliseconds, TimeUnit.MILLISECONDS); } catch (Exception e) { Utils. rethrowIfRuntimeOrSameType(e); throw new S3RuntimeException("Error putting into bucket" + bucket, diff --git a/s3/src/main/java/org/jclouds/aws/s3/internal/LiveS3ObjectMap.java b/s3/src/main/java/org/jclouds/aws/s3/internal/LiveS3ObjectMap.java index 277c65dac4..e28b81b8d8 100644 --- a/s3/src/main/java/org/jclouds/aws/s3/internal/LiveS3ObjectMap.java +++ b/s3/src/main/java/org/jclouds/aws/s3/internal/LiveS3ObjectMap.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.jclouds.Utils; import org.jclouds.aws.s3.S3Connection; @@ -87,7 +88,8 @@ public class LiveS3ObjectMap extends BaseS3Map implements S3ObjectMap public S3Object get(Object key) { try { - return connection.getObject(bucket, key.toString()).get(); + return connection.getObject(bucket, key.toString()).get( + requestTimeoutMilliseconds, TimeUnit.MILLISECONDS); } catch (Exception e) { Utils. rethrowIfRuntimeOrSameType(e); throw new S3RuntimeException(String.format( @@ -98,7 +100,8 @@ public class LiveS3ObjectMap extends BaseS3Map implements S3ObjectMap public S3Object put(String key, S3Object value) { S3Object returnVal = get(key); try { - connection.addObject(bucket, value).get(); + connection.addObject(bucket, value).get(requestTimeoutMilliseconds, + TimeUnit.MILLISECONDS); } catch (Exception e) { Utils. rethrowIfRuntimeOrSameType(e); throw new S3RuntimeException(String.format( @@ -114,7 +117,8 @@ public class LiveS3ObjectMap extends BaseS3Map implements S3ObjectMap puts.add(connection.addObject(bucket, object)); } for (Future put : puts) - put.get();// this will throw an exception if there was a problem + // this will throw an exception if there was a problem + put.get(requestTimeoutMilliseconds, TimeUnit.MILLISECONDS); } catch (Exception e) { Utils. rethrowIfRuntimeOrSameType(e); throw new S3RuntimeException("Error putting into bucket" + bucket, @@ -125,7 +129,8 @@ public class LiveS3ObjectMap extends BaseS3Map implements S3ObjectMap public S3Object remove(Object key) { S3Object old = get(key); try { - connection.deleteObject(bucket, key.toString()).get(); + connection.deleteObject(bucket, key.toString()).get( + requestTimeoutMilliseconds, TimeUnit.MILLISECONDS); } catch (Exception e) { Utils. rethrowIfRuntimeOrSameType(e); throw new S3RuntimeException(String.format( diff --git a/s3/src/test/java/org/jclouds/aws/s3/AmazonS3Test.java b/s3/src/test/java/org/jclouds/aws/s3/AmazonS3Test.java index 6e8c00a34d..4f7f8b3f34 100644 --- a/s3/src/test/java/org/jclouds/aws/s3/AmazonS3Test.java +++ b/s3/src/test/java/org/jclouds/aws/s3/AmazonS3Test.java @@ -30,6 +30,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; import org.jclouds.aws.s3.domain.S3Bucket; @@ -43,12 +44,12 @@ public class AmazonS3Test extends S3IntegrationTest { private String returnedString; List listAllMyBuckets() throws Exception { - return client.getBuckets().get(); + return client.getBuckets().get(10,TimeUnit.SECONDS); } S3Object getObject() throws Exception { S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest"); - return client.getObject(s3Bucket, "3366").get(); + return client.getObject(s3Bucket, "3366").get(10,TimeUnit.SECONDS); } String putFileObject() throws Exception { @@ -56,7 +57,7 @@ public class AmazonS3Test extends S3IntegrationTest { S3Object object = new S3Object("meat"); object.setContentType("text/xml"); object.setContent(new File("pom.xml")); - return client.addObject(s3Bucket, object).get(); + return client.addObject(s3Bucket, object).get(10,TimeUnit.SECONDS); } @DataProvider(name = "putTests") @@ -75,17 +76,17 @@ public class AmazonS3Test extends S3IntegrationTest { void testPutObject(String key, String type, Object content, Object realObject) throws Exception { S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "filetestsforadrian"); - client.createBucketIfNotExists(s3Bucket).get(); + client.createBucketIfNotExists(s3Bucket).get(10,TimeUnit.SECONDS); context.createS3ObjectMap(s3Bucket).clear(); - assertEquals(client.getBucket(s3Bucket).get().getContents().size(), 0); + assertEquals(client.getBucket(s3Bucket).get(10,TimeUnit.SECONDS).getContents().size(), 0); S3Object object = new S3Object(key); object.setContentType(type); object.setContent(content); - assertNotNull(client.addObject(s3Bucket, object).get()); - object = client.getObject(s3Bucket, object.getKey()).get(); + assertNotNull(client.addObject(s3Bucket, object).get(10,TimeUnit.SECONDS)); + object = client.getObject(s3Bucket, object.getKey()).get(10,TimeUnit.SECONDS); returnedString = S3Utils.getContentAsStringAndClose(object); assertEquals(returnedString, realObject); - assertEquals(client.getBucket(s3Bucket).get().getContents().size(), 1); + assertEquals(client.getBucket(s3Bucket).get(10,TimeUnit.SECONDS).getContents().size(), 1); } @Test @@ -93,38 +94,38 @@ public class AmazonS3Test extends S3IntegrationTest { String realObject = IOUtils.toString(new FileInputStream("pom.xml")); S3Bucket sourceBucket = new S3Bucket(bucketPrefix + "copysource"); - client.createBucketIfNotExists(sourceBucket).get(); - assertEquals(client.getBucket(sourceBucket).get().getContents().size(), + client.createBucketIfNotExists(sourceBucket).get(10,TimeUnit.SECONDS); + assertEquals(client.getBucket(sourceBucket).get(10,TimeUnit.SECONDS).getContents().size(), 0); S3Object sourceObject = new S3Object("file"); sourceObject.setContentType("text/xml"); sourceObject.setContent(new File("pom.xml")); - client.addObject(sourceBucket, sourceObject).get(); - assertEquals(client.getBucket(sourceBucket).get().getContents().size(), + client.addObject(sourceBucket, sourceObject).get(10,TimeUnit.SECONDS); + assertEquals(client.getBucket(sourceBucket).get(10,TimeUnit.SECONDS).getContents().size(), 1); sourceObject = client.getObject(sourceBucket, sourceObject.getKey()) - .get(); + .get(10,TimeUnit.SECONDS); assertEquals(S3Utils.getContentAsStringAndClose(sourceObject), realObject); S3Bucket destinationBucket = new S3Bucket(bucketPrefix + "copydestination"); - client.createBucketIfNotExists(destinationBucket).get(); - assertEquals(client.getBucket(destinationBucket).get().getContents() + client.createBucketIfNotExists(destinationBucket).get(10,TimeUnit.SECONDS); + assertEquals(client.getBucket(destinationBucket).get(10,TimeUnit.SECONDS).getContents() .size(), 0); S3Object destinationObject = new S3Object(sourceObject.getKey()); client.copyObject(sourceBucket, sourceObject, destinationBucket, - destinationObject).get(); - assertEquals(client.getBucket(destinationBucket).get().getContents() + destinationObject).get(10,TimeUnit.SECONDS); + assertEquals(client.getBucket(destinationBucket).get(10,TimeUnit.SECONDS).getContents() .size(), 1); destinationObject = client.getObject(destinationBucket, - destinationObject.getKey()).get(); + destinationObject.getKey()).get(10,TimeUnit.SECONDS); assertEquals(S3Utils.getContentAsStringAndClose(destinationObject), realObject); @@ -133,32 +134,32 @@ public class AmazonS3Test extends S3IntegrationTest { S3Object headObject() throws Exception { S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest"); - return client.headObject(s3Bucket, "3366").get(); + return client.headObject(s3Bucket, "3366").get(10,TimeUnit.SECONDS); } Boolean bucketExists() throws Exception { S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest"); - return client.bucketExists(s3Bucket).get(); + return client.bucketExists(s3Bucket).get(10,TimeUnit.SECONDS); } Boolean deleteBucket() throws Exception { S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest"); - return client.deleteBucket(s3Bucket).get(); + return client.deleteBucket(s3Bucket).get(10,TimeUnit.SECONDS); } Boolean deleteObject() throws Exception { S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest"); - return client.deleteObject(s3Bucket, "3366").get(); + return client.deleteObject(s3Bucket, "3366").get(10,TimeUnit.SECONDS); } Boolean createBucketIfNotExists() throws Exception { S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest"); - return client.createBucketIfNotExists(s3Bucket).get(); + return client.createBucketIfNotExists(s3Bucket).get(10,TimeUnit.SECONDS); } S3Bucket getBucket() throws Exception { S3Bucket s3Bucket = new S3Bucket(bucketPrefix + "adrianjbosstest"); - return client.getBucket(s3Bucket).get(); + return client.getBucket(s3Bucket).get(10,TimeUnit.SECONDS); } } \ No newline at end of file diff --git a/s3/src/test/java/org/jclouds/aws/s3/BaseS3MapTest.java b/s3/src/test/java/org/jclouds/aws/s3/BaseS3MapTest.java index 4f298010b6..32153f97d1 100644 --- a/s3/src/test/java/org/jclouds/aws/s3/BaseS3MapTest.java +++ b/s3/src/test/java/org/jclouds/aws/s3/BaseS3MapTest.java @@ -33,6 +33,8 @@ import java.io.InputStream; import java.util.Map; import java.util.TreeSet; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.io.IOUtils; import org.jclouds.aws.s3.domain.S3Bucket; @@ -68,7 +70,8 @@ public abstract class BaseS3MapTest extends S3IntegrationTest { @BeforeMethod @Parameters( { "basedir" }) protected void setUpTempDir(String basedir) throws InterruptedException, - ExecutionException, FileNotFoundException, IOException { + ExecutionException, FileNotFoundException, IOException, + TimeoutException { tmpDirectory = basedir + File.separator + "target" + File.separator + "testFiles" + File.separator + getClass().getSimpleName(); new File(tmpDirectory).mkdirs(); @@ -88,7 +91,7 @@ public abstract class BaseS3MapTest extends S3IntegrationTest { .toInputStream("dogma"), "five", IOUtils .toInputStream("emma")); bucket = new S3Bucket(bucketPrefix + ".mimi"); - client.createBucketIfNotExists(bucket).get(); + client.createBucketIfNotExists(bucket).get(10, TimeUnit.SECONDS); map = createMap(context, bucket); map.clear(); } @@ -105,11 +108,11 @@ public abstract class BaseS3MapTest extends S3IntegrationTest { @Test public void testClear() { map.clear(); - assertEquals(map.size(),0); + assertEquals(map.size(), 0); putString("one", "apple"); - assertEquals(map.size(),1); + assertEquals(map.size(), 1); map.clear(); - assertEquals(map.size(),0); + assertEquals(map.size(), 0); } @Test() diff --git a/s3/src/test/java/org/jclouds/aws/s3/S3IntegrationTest.java b/s3/src/test/java/org/jclouds/aws/s3/S3IntegrationTest.java index 233b4636b7..593068aa64 100644 --- a/s3/src/test/java/org/jclouds/aws/s3/S3IntegrationTest.java +++ b/s3/src/test/java/org/jclouds/aws/s3/S3IntegrationTest.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.logging.ConsoleHandler; import java.util.logging.Formatter; import java.util.logging.Handler; @@ -53,29 +54,32 @@ import com.google.inject.Module; @Test(sequential = true) public class S3IntegrationTest { + @BeforeTest + void enableDebug() { + if (debugEnabled()) { + Handler HANDLER = new ConsoleHandler() { + { + setLevel(Level.ALL); + setFormatter(new Formatter() { - private static final Handler HANDLER = new ConsoleHandler() { - { - setLevel(Level.ALL); - setFormatter(new Formatter() { - - @Override - public String format(LogRecord record) { - return String.format("[%tT %-7s] [%-7s] [%s]: %s %s\n", - new Date(record.getMillis()), record.getLevel(), - Thread.currentThread().getName(), record - .getLoggerName(), record.getMessage(), - record.getThrown() == null ? "" : record - .getThrown()); + @Override + public String format(LogRecord record) { + return String.format( + "[%tT %-7s] [%-7s] [%s]: %s %s\n", + new Date(record.getMillis()), record + .getLevel(), Thread.currentThread() + .getName(), record.getLoggerName(), + record.getMessage(), + record.getThrown() == null ? "" : record + .getThrown()); + } + }); } - }); + }; + Logger guiceLogger = Logger.getLogger("org.jclouds"); + guiceLogger.addHandler(HANDLER); + guiceLogger.setLevel(Level.ALL); } - }; - - static { - Logger guiceLogger = Logger.getLogger("org.jclouds"); - guiceLogger.addHandler(HANDLER); - guiceLogger.setLevel(Level.ALL); } String badRequestWhenSourceIsDestBucketOnCopy400 = "InvalidRequestThe Source and Destination may not be the same when the MetadataDirective is Copy.54C77CAF4D42474BSJecknEUUUx88/65VAKbCdKSOCkpuVTeu7ZG9in9x9NTNglGnoxdbALCfS4k/DUZ"; @@ -113,6 +117,10 @@ public class S3IntegrationTest { deleteEverything(); } + protected boolean debugEnabled() { + return true; + } + protected S3Context createS3Context(String AWSAccessKeyId, String AWSSecretAccessKey) { return S3ContextFactory.createS3Context(buildS3Properties( @@ -129,7 +137,7 @@ public class S3IntegrationTest { checkNotNull(AWSSecretAccessKey, "AWSSecretAccessKey")); properties.setProperty(HttpConstants.PROPERTY_HTTP_SECURE, "false"); properties.setProperty(HttpConstants.PROPERTY_HTTP_PORT, "80"); -// properties.setProperty("jclouds.http.sax.debug", "true"); + // properties.setProperty("jclouds.http.sax.debug", "true"); return properties; } @@ -139,21 +147,22 @@ public class S3IntegrationTest { protected void deleteEverything() throws Exception { try { - List buckets = client.getBuckets().get(); + List buckets = client.getBuckets().get(10, + TimeUnit.SECONDS); List> results = new ArrayList>(); for (S3Bucket bucket : buckets) { if (bucket.getName().startsWith(bucketPrefix.toLowerCase())) { - bucket = client.getBucket(bucket).get(); + bucket = client.getBucket(bucket).get(10, TimeUnit.SECONDS); for (S3Object object : bucket.getContents()) { results.add(client .deleteObject(bucket, object.getKey())); } Iterator> iterator = results.iterator(); while (iterator.hasNext()) { - iterator.next().get(); + iterator.next().get(10, TimeUnit.SECONDS); iterator.remove(); } - client.deleteBucket(bucket).get(); + client.deleteBucket(bucket).get(10, TimeUnit.SECONDS); } } @@ -164,7 +173,7 @@ public class S3IntegrationTest { @AfterTest protected void tearDownClient() throws Exception { -// deleteEverything(); + // deleteEverything(); context.close(); context = null; }