addressed all generic issues and Issue 14

git-svn-id: http://jclouds.googlecode.com/svn/trunk@310 3d8758e0-26b5-11de-8745-db77d3ebf521
This commit is contained in:
adrian.f.cole 2009-05-05 12:07:27 +00:00
parent 994295d872
commit e55f31caad
34 changed files with 259 additions and 374 deletions

View File

@ -23,12 +23,12 @@
*/ */
package org.jclouds; package org.jclouds;
import org.apache.commons.io.IOUtils;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import org.apache.commons.io.IOUtils;
/** /**
* // TODO: Adrian: Document this! * // TODO: Adrian: Document this!
* *
@ -37,6 +37,7 @@ import java.util.concurrent.ExecutionException;
public class Utils { public class Utils {
@SuppressWarnings("unchecked")
public static <E extends Exception> void rethrowIfRuntimeOrSameType(Exception e) throws E { public static <E extends Exception> void rethrowIfRuntimeOrSameType(Exception e) throws E {
if (e instanceof ExecutionException) { if (e instanceof ExecutionException) {
Throwable nested = e.getCause(); Throwable nested = e.getCause();

View File

@ -28,7 +28,6 @@ package org.jclouds.command;
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
public interface FutureCommandClient { public interface FutureCommandClient<O extends FutureCommand<?, ?, ?>> {
@SuppressWarnings("unchecked") void submit(O operation);
<O extends FutureCommand> void submit(O operation);
} }

View File

@ -39,31 +39,28 @@ import com.google.inject.assistedinject.Assisted;
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
public abstract class FutureCommandConnectionHandle<C> { public abstract class FutureCommandConnectionHandle<C, O extends FutureCommand<?, ?, ?>> {
protected final BlockingQueue<C> available; protected final BlockingQueue<C> available;
protected final Semaphore maxConnections; protected final Semaphore maxConnections;
protected final Semaphore completed; protected final Semaphore completed;
protected C conn; protected C conn;
@SuppressWarnings("unchecked") protected O command;
protected FutureCommand operation;
@Resource @Resource
protected Logger logger = Logger.NULL; protected Logger logger = Logger.NULL;
@SuppressWarnings("unchecked")
public FutureCommandConnectionHandle(Semaphore maxConnections, public FutureCommandConnectionHandle(Semaphore maxConnections,
@Assisted FutureCommand operation, @Assisted C conn, @Assisted O command, @Assisted C conn, BlockingQueue<C> available)
BlockingQueue<C> available) throws InterruptedException { throws InterruptedException {
this.maxConnections = maxConnections; this.maxConnections = maxConnections;
this.operation = operation; this.command = command;
this.conn = conn; this.conn = conn;
this.available = available; this.available = available;
this.completed = new Semaphore(1); this.completed = new Semaphore(1);
completed.acquire(); completed.acquire();
} }
@SuppressWarnings("unchecked") public O getCommand() {
public FutureCommand getOperation() { return command;
return operation;
} }
public abstract void startConnection(); public abstract void startConnection();
@ -79,7 +76,7 @@ public abstract class FutureCommandConnectionHandle<C> {
logger.trace("%1s - %2d - releasing to pool", conn, conn.hashCode()); logger.trace("%1s - %2d - releasing to pool", conn, conn.hashCode());
available.put(conn); available.put(conn);
conn = null; conn = null;
operation = null; command = null;
completed.release(); completed.release();
} }
@ -94,7 +91,7 @@ public abstract class FutureCommandConnectionHandle<C> {
shutdownConnection(); shutdownConnection();
} finally { } finally {
conn = null; conn = null;
operation = null; command = null;
maxConnections.release(); maxConnections.release();
} }
} }

View File

@ -41,41 +41,38 @@ import com.google.inject.name.Named;
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
public abstract class FutureCommandConnectionPool<C> extends BaseLifeCycle { public abstract class FutureCommandConnectionPool<C, O extends FutureCommand<?, ?, ?>>
extends BaseLifeCycle {
protected final Semaphore allConnections; protected final Semaphore allConnections;
protected final BlockingQueue<C> available; protected final BlockingQueue<C> available;
protected final FutureCommandConnectionHandleFactory<C> futureCommandConnectionHandleFactory; protected final BlockingQueue<O> commandQueue;
protected final FutureCommandConnectionHandleFactory<C, O> futureCommandConnectionHandleFactory;
protected final int maxConnectionReuse; protected final int maxConnectionReuse;
protected final AtomicInteger currentSessionFailures = new AtomicInteger(0); protected final AtomicInteger currentSessionFailures = new AtomicInteger(0);
protected final FutureCommandConnectionRetry<C> futureCommandConnectionRetry;
protected volatile boolean hitBottom = false; protected volatile boolean hitBottom = false;
public FutureCommandConnectionPool( public FutureCommandConnectionPool(
ExecutorService executor, ExecutorService executor,
FutureCommandConnectionRetry<C> futureCommandConnectionRetry,
Semaphore allConnections, Semaphore allConnections,
FutureCommandConnectionHandleFactory<C> futureCommandConnectionHandleFactory, BlockingQueue<O> commandQueue,
FutureCommandConnectionHandleFactory<C, O> futureCommandConnectionHandleFactory,
@Named("maxConnectionReuse") int maxConnectionReuse, @Named("maxConnectionReuse") int maxConnectionReuse,
BlockingQueue<C> available, BaseLifeCycle... dependencies) { BlockingQueue<C> available, BaseLifeCycle... dependencies) {
super(executor, dependencies); super(executor, dependencies);
this.futureCommandConnectionRetry = futureCommandConnectionRetry;
this.allConnections = allConnections; this.allConnections = allConnections;
this.commandQueue = commandQueue;
this.futureCommandConnectionHandleFactory = futureCommandConnectionHandleFactory; this.futureCommandConnectionHandleFactory = futureCommandConnectionHandleFactory;
this.maxConnectionReuse = maxConnectionReuse; this.maxConnectionReuse = maxConnectionReuse;
this.available = available; this.available = available;
} }
@SuppressWarnings("unchecked")
protected void setResponseException(Exception ex, C conn) { protected void setResponseException(Exception ex, C conn) {
FutureCommand command = futureCommandConnectionRetry O command = getHandleFromConnection(conn).getCommand();
.getHandleFromConnection(conn).getOperation();
command.getResponseFuture().setException(ex); command.getResponseFuture().setException(ex);
} }
@SuppressWarnings("unchecked")
protected void cancel(C conn) { protected void cancel(C conn) {
FutureCommand command = futureCommandConnectionRetry O command = getHandleFromConnection(conn).getCommand();
.getHandleFromConnection(conn).getOperation();
command.cancel(true); command.cancel(true);
} }
@ -119,22 +116,49 @@ public abstract class FutureCommandConnectionPool<C> extends BaseLifeCycle {
protected abstract boolean connectionValid(C conn); protected abstract boolean connectionValid(C conn);
public FutureCommandConnectionHandle<C> getHandle( public FutureCommandConnectionHandle<C, O> getHandle(O command)
FutureCommand<?, ?, ?> command) throws InterruptedException, throws InterruptedException, TimeoutException {
TimeoutException {
exceptionIfNotActive(); exceptionIfNotActive();
C conn = getConnection(); C conn = getConnection();
FutureCommandConnectionHandle<C> handle = futureCommandConnectionHandleFactory FutureCommandConnectionHandle<C, O> handle = futureCommandConnectionHandleFactory
.create(command, conn); .create(command, conn);
futureCommandConnectionRetry associateHandleWithConnection(handle, conn);
.associateHandleWithConnection(handle, conn);
return handle; return handle;
} }
protected void resubmitCommand(C connection) {
O command = getCommandFromConnection(connection);
if (command != null) {
logger.info("resubmitting command: %1s", command);
commandQueue.add(command);
}
}
O getCommandFromConnection(C connection) {
FutureCommandConnectionHandle<C, O> handle = getHandleFromConnection(connection);
if (handle != null && handle.getCommand() != null) {
return handle.getCommand();
}
return null;
}
protected void setExceptionOnCommand(C connection, Exception e) {
FutureCommand<?, ?, ?> command = getCommandFromConnection(connection);
if (command != null) {
logger.warn(e, "exception in command: %1s", command);
command.setException(e);
}
}
protected abstract void associateHandleWithConnection(
FutureCommandConnectionHandle<C, O> handle, C connection);
protected abstract FutureCommandConnectionHandle<C, O> getHandleFromConnection(
C connection);
protected abstract void createNewConnection() throws InterruptedException; protected abstract void createNewConnection() throws InterruptedException;
public interface FutureCommandConnectionHandleFactory<C> { public interface FutureCommandConnectionHandleFactory<C, O extends FutureCommand<?, ?, ?>> {
@SuppressWarnings("unchecked") FutureCommandConnectionHandle<C, O> create(O command, C conn);
FutureCommandConnectionHandle<C> create(FutureCommand command, C conn);
} }
} }

View File

@ -40,15 +40,15 @@ import com.google.inject.Inject;
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
public class FutureCommandConnectionPoolClient<C> extends BaseLifeCycle public class FutureCommandConnectionPoolClient<C, O extends FutureCommand<?, ?, ?>>
implements FutureCommandClient { extends BaseLifeCycle implements FutureCommandClient<O> {
private final FutureCommandConnectionPool<C> futureCommandConnectionPool; private final FutureCommandConnectionPool<C, O> futureCommandConnectionPool;
private final BlockingQueue<FutureCommand> commandQueue; private final BlockingQueue<O> commandQueue;
@Inject @Inject
public FutureCommandConnectionPoolClient(ExecutorService executor, public FutureCommandConnectionPoolClient(ExecutorService executor,
FutureCommandConnectionPool<C> futureCommandConnectionPool, FutureCommandConnectionPool<C, O> futureCommandConnectionPool,
BlockingQueue<FutureCommand> commandQueue) { BlockingQueue<O> commandQueue) {
super(executor, futureCommandConnectionPool); super(executor, futureCommandConnectionPool);
this.futureCommandConnectionPool = futureCommandConnectionPool; this.futureCommandConnectionPool = futureCommandConnectionPool;
this.commandQueue = commandQueue; this.commandQueue = commandQueue;
@ -66,7 +66,8 @@ public class FutureCommandConnectionPoolClient<C> extends BaseLifeCycle
exception.compareAndSet(null, futureCommandConnectionPool exception.compareAndSet(null, futureCommandConnectionPool
.getException()); .getException());
while (!commandQueue.isEmpty()) { while (!commandQueue.isEmpty()) {
FutureCommand command = commandQueue.remove(); FutureCommand<?, ?, ?> command = (FutureCommand<?, ?, ?>) commandQueue
.remove();
if (command != null) { if (command != null) {
if (exception.get() != null) if (exception.get() != null)
command.setException(exception.get()); command.setException(exception.get());
@ -78,7 +79,7 @@ public class FutureCommandConnectionPoolClient<C> extends BaseLifeCycle
@Override @Override
protected void doWork() throws InterruptedException { protected void doWork() throws InterruptedException {
FutureCommand command = commandQueue.poll(1, TimeUnit.SECONDS); O command = commandQueue.poll(1, TimeUnit.SECONDS);
if (command != null) { if (command != null) {
try { try {
invoke(command); invoke(command);
@ -89,37 +90,37 @@ public class FutureCommandConnectionPoolClient<C> extends BaseLifeCycle
} }
} }
public <O extends FutureCommand> void submit(O operation) { public void submit(O command) {
exceptionIfNotActive(); exceptionIfNotActive();
commandQueue.add(operation); commandQueue.add(command);
} }
protected <O extends FutureCommand> void invoke(O operation) { protected void invoke(O command) {
exceptionIfNotActive(); exceptionIfNotActive();
FutureCommandConnectionHandle<C> connectionHandle = null; FutureCommandConnectionHandle<C, O> connectionHandle = null;
try { try {
connectionHandle = futureCommandConnectionPool.getHandle(operation); connectionHandle = futureCommandConnectionPool.getHandle(command);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger logger
.warn( .warn(
e, e,
"Interrupted getting a connection for operation %1s; retrying", "Interrupted getting a connection for command %1s; retrying",
operation); command);
commandQueue.add(operation); commandQueue.add(command);
return; return;
} catch (TimeoutException e) { } catch (TimeoutException e) {
logger.warn(e, logger.warn(e,
"Timeout getting a connection for operation %1s; retrying", "Timeout getting a connection for command %1s; retrying",
operation); command);
commandQueue.add(operation); commandQueue.add(command);
return; return;
} }
if (connectionHandle == null) { if (connectionHandle == null) {
logger.error( logger.error(
"Failed to obtain connection for operation %1s; retrying", "Failed to obtain connection for command %1s; retrying",
operation); command);
commandQueue.add(operation); commandQueue.add(command);
return; return;
} }
connectionHandle.startConnection(); connectionHandle.startConnection();

View File

@ -1,79 +0,0 @@
/**
*
* Copyright (C) 2009 Adrian Cole <adriancole@jclouds.org>
*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
package org.jclouds.command.pool;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Resource;
import org.jclouds.command.FutureCommand;
import org.jclouds.logging.Logger;
/**
* // TODO: Adrian: Document this!
*
* @author Adrian Cole
*/
public abstract class FutureCommandConnectionRetry<C> {
protected final BlockingQueue<FutureCommand> commandQueue;
protected final AtomicInteger errors;
@Resource
protected Logger logger = Logger.NULL;
public FutureCommandConnectionRetry(
BlockingQueue<FutureCommand> commandQueue, AtomicInteger errors) {
this.commandQueue = commandQueue;
this.errors = errors;
}
public abstract void associateHandleWithConnection(
FutureCommandConnectionHandle<C> handle, C connection);
public abstract FutureCommandConnectionHandle<C> getHandleFromConnection(
C connection);
public boolean shutdownConnectionAndRetryOperation(C connection) {
FutureCommandConnectionHandle<C> handle = getHandleFromConnection(connection);
if (handle != null) {
try {
logger.info("%1s - shutting down connection", connection);
handle.shutdownConnection();
incrementErrorCountAndRetry(handle.getOperation());
return true;
} catch (IOException e) {
logger.error(e, "%1s - error shutting down connection",
connection);
}
}
return false;
}
public void incrementErrorCountAndRetry(FutureCommand command) {
errors.getAndIncrement();
logger.info("resubmitting command %1s", command);
commandQueue.add(command);
}
}

View File

@ -23,48 +23,51 @@
*/ */
package org.jclouds.command.pool.config; package org.jclouds.command.pool.config;
import com.google.inject.*;
import com.google.inject.name.Named;
import org.jclouds.command.FutureCommand;
import org.jclouds.lifecycle.config.LifeCycleModule;
import org.jclouds.command.pool.PoolConstants;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.jclouds.command.pool.PoolConstants;
import org.jclouds.lifecycle.config.LifeCycleModule;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
/** /**
* // TODO: Adrian: Document this! * // TODO: Adrian: Document this!
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
public abstract class FutureCommandConnectionPoolClientModule<C> extends AbstractModule { public abstract class FutureCommandConnectionPoolClientModule<C> extends
AbstractModule {
protected void configure() { protected void configure() {
install(new LifeCycleModule()); install(new LifeCycleModule());
bind(AtomicInteger.class).toInstance(new AtomicInteger());// max errors bind(AtomicInteger.class).toInstance(new AtomicInteger());// max errors
bind(new TypeLiteral<BlockingQueue<FutureCommand>>() {
}).to(new TypeLiteral<LinkedBlockingQueue<FutureCommand>>() {
}).in(Scopes.SINGLETON);
} }
@Provides @Provides
@Singleton @Singleton
public abstract BlockingQueue<C> provideAvailablePool(@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max) throws Exception; public abstract BlockingQueue<C> provideAvailablePool(
@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max)
throws Exception;
/** /**
* controls production and destruction of real connections. * controls production and destruction of real connections.
* <p/> * <p/>
* aquire before a new connection is created * aquire before a new connection is created release after an error has
* release after an error has occurred * occurred
* *
* @param max * @param max
* @return * @return
* @throws Exception * @throws Exception
*/ */
@Provides @Provides
@Singleton @Singleton
public Semaphore provideTotalConnectionSemaphore(@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max) throws Exception { public Semaphore provideTotalConnectionSemaphore(
return new Semaphore(max, true); @Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTIONS) int max)
throws Exception {
return new Semaphore(max, true);
} }
} }

View File

@ -25,19 +25,22 @@ package org.jclouds.http;
/** /**
* // TODO: Adrian: Document this! * // TODO: Adrian: Document this!
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
public class HttpException extends Exception { public class HttpException extends Exception {
private static final long serialVersionUID = 1L;
public HttpException(String s) { public HttpException(String s) {
super(s); // TODO: Adrian: Customise this generated block super(s);
} }
public HttpException(String s, Throwable throwable) { public HttpException(String s, Throwable throwable) {
super(s, throwable); // TODO: Adrian: Customise this generated block super(s, throwable);
} }
public HttpException(Throwable throwable) { public HttpException(Throwable throwable) {
super(throwable); // TODO: Adrian: Customise this generated block super(throwable);
} }
} }

View File

@ -25,7 +25,6 @@ package org.jclouds.http;
import java.util.List; import java.util.List;
import org.jclouds.command.FutureCommand;
import org.jclouds.command.FutureCommandClient; import org.jclouds.command.FutureCommandClient;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -35,11 +34,11 @@ import com.google.inject.Inject;
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
public interface HttpFutureCommandClient extends FutureCommandClient { public interface HttpFutureCommandClient
extends FutureCommandClient<HttpFutureCommand<?>> {
List<HttpRequestFilter> getRequestFilters(); List<HttpRequestFilter> getRequestFilters();
@Inject @Inject
void setRequestFilters(List<HttpRequestFilter> requestFilters); void setRequestFilters(List<HttpRequestFilter> requestFilters);
<O extends FutureCommand> void submit(O operation);
} }

View File

@ -40,7 +40,6 @@ import javax.annotation.Resource;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.jclouds.Utils; import org.jclouds.Utils;
import org.jclouds.command.FutureCommand;
import org.jclouds.logging.Logger; import org.jclouds.logging.Logger;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -71,7 +70,7 @@ public class JavaUrlHttpFutureCommandClient implements HttpFutureCommandClient {
this.target = target; this.target = target;
} }
public <O extends FutureCommand> void submit(O operation) { public void submit(HttpFutureCommand<?> operation) {
HttpRequest request = (HttpRequest) operation.getRequest(); HttpRequest request = (HttpRequest) operation.getRequest();
HttpURLConnection connection = null; HttpURLConnection connection = null;
try { try {

View File

@ -41,6 +41,7 @@ public class CommandFactory {
ParseSax<?> create(ParseSax.HandlerWithResult<?> handler); ParseSax<?> create(ParseSax.HandlerWithResult<?> handler);
} }
@SuppressWarnings("unchecked")
public GetAndParseSax<?> createGetAndParseSax(String uri, public GetAndParseSax<?> createGetAndParseSax(String uri,
ParseSax.HandlerWithResult<?> handler) { ParseSax.HandlerWithResult<?> handler) {
return new GetAndParseSax(uri, parseSaxFactory.create(handler)); return new GetAndParseSax(uri, parseSaxFactory.create(handler));

View File

@ -57,6 +57,7 @@ import org.testng.annotations.Test;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
import com.google.inject.name.Names; import com.google.inject.name.Names;
@ -126,7 +127,7 @@ public abstract class BaseHttpFutureCommandClientTest {
}).toInstance(filters); }).toInstance(filters);
} }
}); });
factory = injector.getInstance(CommandFactory.class); factory = injector.getInstance(Key.get(CommandFactory.class));
client = injector.getInstance(HttpFutureCommandClient.class); client = injector.getInstance(HttpFutureCommandClient.class);
closer = injector.getInstance(Closer.class); closer = injector.getInstance(Closer.class);
assert client != null; assert client != null;
@ -187,7 +188,7 @@ public abstract class BaseHttpFutureCommandClientTest {
@Test(invocationCount = 500, timeOut = 1500) @Test(invocationCount = 500, timeOut = 1500)
void testGetAndParseSax() throws MalformedURLException, ExecutionException, void testGetAndParseSax() throws MalformedURLException, ExecutionException,
InterruptedException, TimeoutException { InterruptedException, TimeoutException {
GetAndParseSax getAndParseSax = factory.createGetAndParseSax("/", GetAndParseSax<?> getAndParseSax = factory.createGetAndParseSax("/",
new ParseSax.HandlerWithResult<String>() { new ParseSax.HandlerWithResult<String>() {
@Override @Override
public String getResult() { public String getResult() {

View File

@ -23,46 +23,48 @@
*/ */
package org.jclouds.http.commands.config; package org.jclouds.http.commands.config;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.jclouds.http.HttpFutureCommand; import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.commands.CommandFactory; import org.jclouds.http.commands.CommandFactory;
import org.jclouds.http.commands.callables.xml.ParseSax; import org.jclouds.http.commands.callables.xml.ParseSax;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import com.google.inject.Guice;
import com.google.inject.Injector;
/** /**
* // TODO: Adrian: Document this! * // TODO: Adrian: Document this!
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
@Test @Test
public class HttpCommandsModuleTest { public class HttpCommandsModuleTest {
public void testGetString() { public void testGetString() {
Injector i = Guice.createInjector(new HttpCommandsModule()); Injector i = Guice.createInjector(new HttpCommandsModule());
CommandFactory factory = i.getInstance(CommandFactory.class); CommandFactory factory = i.getInstance(CommandFactory.class);
HttpFutureCommand get = factory.createGetString("/index.html"); HttpFutureCommand<String> get = factory.createGetString("/index.html");
assert get != null; assert get != null;
assert get.getResponseFuture() != null; assert get.getResponseFuture() != null;
} }
public void testHead() { public void testHead() {
Injector i = Guice.createInjector(new HttpCommandsModule()); Injector i = Guice.createInjector(new HttpCommandsModule());
CommandFactory factory = i.getInstance(CommandFactory.class); CommandFactory factory = i.getInstance(CommandFactory.class);
HttpFutureCommand Head = factory.createHead("/index.html"); HttpFutureCommand<Boolean> Head = factory.createHead("/index.html");
assert Head != null; assert Head != null;
assert Head.getResponseFuture() != null; assert Head.getResponseFuture() != null;
} }
public void testGetAndParseXml() { public void testGetAndParseXml() {
Injector i = Guice.createInjector(new HttpCommandsModule()); Injector i = Guice.createInjector(new HttpCommandsModule());
CommandFactory factory = i.getInstance(CommandFactory.class); CommandFactory factory = i.getInstance(CommandFactory.class);
HttpFutureCommand GetAndParseXml = factory.createGetAndParseSax("/index.html", new ParseSax.HandlerWithResult<String>(){ HttpFutureCommand<?> GetAndParseXml = factory.createGetAndParseSax(
public String getResult() { "/index.html", new ParseSax.HandlerWithResult<String>() {
return "hello"; public String getResult() {
} return "hello";
}); }
assert GetAndParseXml != null; });
assert GetAndParseXml.getResponseFuture() != null; assert GetAndParseXml != null;
assert GetAndParseXml.getResponseFuture() != null;
} }
} }

View File

@ -23,20 +23,13 @@
*/ */
package org.jclouds.lifecycle.config; package org.jclouds.lifecycle.config;
import static com.google.inject.matcher.Matchers.*;
import static org.testng.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import org.jclouds.lifecycle.Closer; import org.jclouds.lifecycle.Closer;
import org.jclouds.logging.Logger;
import org.jclouds.logging.config.BindLoggersAnnotatedWithResource;
import org.jclouds.logging.jdk.JDKLogger;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;

View File

@ -106,6 +106,7 @@ public class BindLoggersAnnotatedWithResourceTest {
} }
public static class C { public static class C {
@SuppressWarnings("unused")
@Inject @Inject
private Logger logger = Logger.NULL; private Logger logger = Logger.NULL;
} }
@ -118,9 +119,11 @@ public class BindLoggersAnnotatedWithResourceTest {
} }
public static class D { public static class D {
@SuppressWarnings("unused")
@Resource @Resource
private Logger logger = Logger.NULL; private Logger logger = Logger.NULL;
@SuppressWarnings("unused")
@Resource @Resource
private Logger blogger; private Logger blogger;

View File

@ -25,20 +25,16 @@ package org.jclouds.http.httpnio.config;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.http.nio.NHttpConnection;
import org.jclouds.command.pool.FutureCommandConnectionRetry;
import org.jclouds.http.HttpConstants; import org.jclouds.http.HttpConstants;
import org.jclouds.http.HttpFutureCommandClient; import org.jclouds.http.HttpFutureCommandClient;
import org.jclouds.http.config.HttpFutureCommandClientModule; import org.jclouds.http.config.HttpFutureCommandClientModule;
import org.jclouds.http.httpnio.config.internal.NonSSLHttpNioConnectionPoolClientModule; import org.jclouds.http.httpnio.config.internal.NonSSLHttpNioConnectionPoolClientModule;
import org.jclouds.http.httpnio.config.internal.SSLHttpNioConnectionPoolClientModule; import org.jclouds.http.httpnio.config.internal.SSLHttpNioConnectionPoolClientModule;
import org.jclouds.http.httpnio.pool.HttpNioConnectionPoolClient; import org.jclouds.http.httpnio.pool.HttpNioConnectionPoolClient;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionRetry;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Named; import com.google.inject.name.Named;
/** /**
@ -59,8 +55,6 @@ public class HttpNioConnectionPoolClientModule extends AbstractModule {
install(new SSLHttpNioConnectionPoolClientModule()); install(new SSLHttpNioConnectionPoolClientModule());
else else
install(new NonSSLHttpNioConnectionPoolClientModule()); install(new NonSSLHttpNioConnectionPoolClientModule());
bind(new TypeLiteral<FutureCommandConnectionRetry<NHttpConnection>>() {
}).to(HttpNioFutureCommandConnectionRetry.class);
bind(HttpFutureCommandClient.class).to( bind(HttpFutureCommandClient.class).to(
HttpNioConnectionPoolClient.class); HttpNioConnectionPoolClient.class);
} }

View File

@ -25,6 +25,7 @@ package org.jclouds.http.httpnio.config.internal;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.http.ConnectionReuseStrategy; import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
@ -48,12 +49,11 @@ import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue; import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost; import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent; import org.apache.http.protocol.RequestUserAgent;
import org.jclouds.command.pool.FutureCommandConnectionRetry;
import org.jclouds.command.pool.PoolConstants; import org.jclouds.command.pool.PoolConstants;
import org.jclouds.command.pool.config.FutureCommandConnectionPoolClientModule; import org.jclouds.command.pool.config.FutureCommandConnectionPoolClientModule;
import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionHandle; import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionHandle;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionPool; import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionPool;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionRetry;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandExecutionHandler; import org.jclouds.http.httpnio.pool.HttpNioFutureCommandExecutionHandler;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -112,6 +112,9 @@ public abstract class BaseHttpNioConnectionPoolClientModule extends
protected void configure() { protected void configure() {
super.configure(); super.configure();
bind(new TypeLiteral<BlockingQueue<HttpFutureCommand<?>>>() {
}).to(new TypeLiteral<LinkedBlockingQueue<HttpFutureCommand<?>>>() {
}).in(Scopes.SINGLETON);
bind( bind(
HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class) HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class)
.toProvider( .toProvider(
@ -126,8 +129,6 @@ public abstract class BaseHttpNioConnectionPoolClientModule extends
bind(ConnectionReuseStrategy.class).to( bind(ConnectionReuseStrategy.class).to(
DefaultConnectionReuseStrategy.class).in(Scopes.SINGLETON); DefaultConnectionReuseStrategy.class).in(Scopes.SINGLETON);
bind(ByteBufferAllocator.class).to(HeapByteBufferAllocator.class); bind(ByteBufferAllocator.class).to(HeapByteBufferAllocator.class);
bind(FutureCommandConnectionRetry.class).to(
HttpNioFutureCommandConnectionRetry.class);
bind( bind(
HttpNioFutureCommandConnectionPool.FutureCommandConnectionHandleFactory.class) HttpNioFutureCommandConnectionPool.FutureCommandConnectionHandleFactory.class)
.toProvider( .toProvider(

View File

@ -29,9 +29,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.http.nio.NHttpConnection; import org.apache.http.nio.NHttpConnection;
import org.jclouds.command.FutureCommand;
import org.jclouds.command.pool.FutureCommandConnectionPoolClient; import org.jclouds.command.pool.FutureCommandConnectionPoolClient;
import org.jclouds.http.HttpException; import org.jclouds.http.HttpException;
import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.HttpFutureCommandClient; import org.jclouds.http.HttpFutureCommandClient;
import org.jclouds.http.HttpRequest; import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpRequestFilter; import org.jclouds.http.HttpRequestFilter;
@ -45,9 +45,10 @@ import com.google.inject.Singleton;
* @author Adrian Cole * @author Adrian Cole
*/ */
@Singleton @Singleton
public class HttpNioConnectionPoolClient extends public class HttpNioConnectionPoolClient
FutureCommandConnectionPoolClient<NHttpConnection> implements extends
HttpFutureCommandClient { FutureCommandConnectionPoolClient<NHttpConnection, HttpFutureCommand<?>>
implements HttpFutureCommandClient {
private List<HttpRequestFilter> requestFilters = Collections.emptyList(); private List<HttpRequestFilter> requestFilters = Collections.emptyList();
public List<HttpRequestFilter> getRequestFilters() { public List<HttpRequestFilter> getRequestFilters() {
@ -60,15 +61,15 @@ public class HttpNioConnectionPoolClient extends
} }
@Override @Override
protected <O extends FutureCommand> void invoke(O operation) { protected void invoke(HttpFutureCommand<?> command) {
HttpRequest request = (HttpRequest) operation.getRequest(); HttpRequest request = (HttpRequest) command.getRequest();
try { try {
for (HttpRequestFilter filter : getRequestFilters()) { for (HttpRequestFilter filter : getRequestFilters()) {
filter.filter(request); filter.filter(request);
} }
super.invoke(operation); super.invoke(command);
} catch (HttpException e) { } catch (HttpException e) {
operation.setException(e); command.setException(e);
} }
} }
@ -76,7 +77,7 @@ public class HttpNioConnectionPoolClient extends
public HttpNioConnectionPoolClient( public HttpNioConnectionPoolClient(
ExecutorService executor, ExecutorService executor,
HttpNioFutureCommandConnectionPool httpFutureCommandConnectionHandleNHttpConnectionNioFutureCommandConnectionPool, HttpNioFutureCommandConnectionPool httpFutureCommandConnectionHandleNHttpConnectionNioFutureCommandConnectionPool,
BlockingQueue<FutureCommand> commandQueue) { BlockingQueue<HttpFutureCommand<?>> commandQueue) {
super( super(
executor, executor,
httpFutureCommandConnectionHandleNHttpConnectionNioFutureCommandConnectionPool, httpFutureCommandConnectionHandleNHttpConnectionNioFutureCommandConnectionPool,

View File

@ -28,8 +28,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import org.apache.http.nio.NHttpConnection; import org.apache.http.nio.NHttpConnection;
import org.jclouds.command.FutureCommand;
import org.jclouds.command.pool.FutureCommandConnectionHandle; import org.jclouds.command.pool.FutureCommandConnectionHandle;
import org.jclouds.http.HttpFutureCommand;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.Assisted;
@ -40,20 +40,20 @@ import com.google.inject.assistedinject.Assisted;
* @author Adrian Cole * @author Adrian Cole
*/ */
public class HttpNioFutureCommandConnectionHandle extends public class HttpNioFutureCommandConnectionHandle extends
FutureCommandConnectionHandle<NHttpConnection> { FutureCommandConnectionHandle<NHttpConnection, HttpFutureCommand<?>> {
@Inject @Inject
public HttpNioFutureCommandConnectionHandle( public HttpNioFutureCommandConnectionHandle(
BlockingQueue<NHttpConnection> available, Semaphore maxConnections, BlockingQueue<NHttpConnection> available, Semaphore maxConnections,
@Assisted NHttpConnection conn, @Assisted FutureCommand operation) @Assisted NHttpConnection conn,
throws InterruptedException { @Assisted HttpFutureCommand<?> command) throws InterruptedException {
super(maxConnections, operation, conn, available); super(maxConnections, command, conn, available);
} }
public void startConnection() { public void startConnection() {
conn.getContext().setAttribute("operation", operation); conn.getContext().setAttribute("command", command);
logger.trace("invoking %1s on connection %2s", operation, conn); logger.trace("invoking %1s on connection %2s", command, conn);
conn.requestOutput(); conn.requestOutput();
} }

View File

@ -40,9 +40,10 @@ import org.apache.http.nio.reactor.IOReactorStatus;
import org.apache.http.nio.reactor.SessionRequest; import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.nio.reactor.SessionRequestCallback; import org.apache.http.nio.reactor.SessionRequestCallback;
import org.jclouds.command.FutureCommand; import org.jclouds.command.FutureCommand;
import org.jclouds.command.pool.FutureCommandConnectionHandle;
import org.jclouds.command.pool.FutureCommandConnectionPool; import org.jclouds.command.pool.FutureCommandConnectionPool;
import org.jclouds.command.pool.FutureCommandConnectionRetry;
import org.jclouds.command.pool.PoolConstants; import org.jclouds.command.pool.PoolConstants;
import org.jclouds.http.HttpFutureCommand;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.name.Named; import com.google.inject.name.Named;
@ -53,7 +54,8 @@ import com.google.inject.name.Named;
* @author Adrian Cole * @author Adrian Cole
*/ */
public class HttpNioFutureCommandConnectionPool extends public class HttpNioFutureCommandConnectionPool extends
FutureCommandConnectionPool<NHttpConnection> implements EventListener { FutureCommandConnectionPool<NHttpConnection, HttpFutureCommand<?>>
implements EventListener {
private final NHttpClientConnectionPoolSessionRequestCallback sessionCallback; private final NHttpClientConnectionPoolSessionRequestCallback sessionCallback;
private final DefaultConnectingIOReactor ioReactor; private final DefaultConnectingIOReactor ioReactor;
@ -65,17 +67,17 @@ public class HttpNioFutureCommandConnectionPool extends
public HttpNioFutureCommandConnectionPool( public HttpNioFutureCommandConnectionPool(
ExecutorService executor, ExecutorService executor,
Semaphore allConnections, Semaphore allConnections,
BlockingQueue<HttpFutureCommand<?>> commandQueue,
BlockingQueue<NHttpConnection> available, BlockingQueue<NHttpConnection> available,
AsyncNHttpClientHandler clientHandler, AsyncNHttpClientHandler clientHandler,
DefaultConnectingIOReactor ioReactor, DefaultConnectingIOReactor ioReactor,
IOEventDispatch dispatch, IOEventDispatch dispatch,
FutureCommandConnectionHandleFactory requestHandleFactory, FutureCommandConnectionHandleFactory requestHandleFactory,
InetSocketAddress target, InetSocketAddress target,
FutureCommandConnectionRetry<NHttpConnection> futureCommandConnectionRetry,
@Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTION_REUSE) int maxConnectionReuse, @Named(PoolConstants.PROPERTY_POOL_MAX_CONNECTION_REUSE) int maxConnectionReuse,
@Named(PoolConstants.PROPERTY_POOL_MAX_SESSION_FAILURES) int maxSessionFailures) { @Named(PoolConstants.PROPERTY_POOL_MAX_SESSION_FAILURES) int maxSessionFailures) {
super(executor, futureCommandConnectionRetry, allConnections, super(executor, allConnections, commandQueue, requestHandleFactory,
requestHandleFactory, maxConnectionReuse, available); maxConnectionReuse, available);
this.ioReactor = ioReactor; this.ioReactor = ioReactor;
this.dispatch = dispatch; this.dispatch = dispatch;
this.target = target; this.target = target;
@ -160,6 +162,20 @@ public class HttpNioFutureCommandConnectionPool extends
} }
} }
@Override
protected void associateHandleWithConnection(
FutureCommandConnectionHandle<NHttpConnection, HttpFutureCommand<?>> handle,
NHttpConnection connection) {
connection.getContext().setAttribute("command-handle", handle);
}
@Override
protected HttpNioFutureCommandConnectionHandle getHandleFromConnection(
NHttpConnection connection) {
return (HttpNioFutureCommandConnectionHandle) connection.getContext()
.getAttribute("command-handle");
}
class NHttpClientConnectionPoolSessionRequestCallback implements class NHttpClientConnectionPoolSessionRequestCallback implements
SessionRequestCallback { SessionRequestCallback {
@ -191,7 +207,7 @@ public class HttpNioFutureCommandConnectionPool extends
private void releaseConnectionAndSetResponseException( private void releaseConnectionAndSetResponseException(
SessionRequest request, Exception e) { SessionRequest request, Exception e) {
allConnections.release(); allConnections.release();
FutureCommand<?, ?, ?> frequest = (FutureCommand<?, ?, ?>) request HttpFutureCommand<?> frequest = (HttpFutureCommand<?>) request
.getAttachment(); .getAttachment();
if (frequest != null) { if (frequest != null) {
logger.error(e, logger.error(e,
@ -240,7 +256,7 @@ public class HttpNioFutureCommandConnectionPool extends
public void connectionTimeout(NHttpConnection conn) { public void connectionTimeout(NHttpConnection conn) {
logger.warn("%1s - %2d - timeout %2d", conn, conn.hashCode(), conn logger.warn("%1s - %2d - timeout %2d", conn, conn.hashCode(), conn
.getSocketTimeout()); .getSocketTimeout());
futureCommandConnectionRetry.shutdownConnectionAndRetryOperation(conn); resubmitCommand(conn);
} }
public void connectionClosed(NHttpConnection conn) { public void connectionClosed(NHttpConnection conn) {
@ -248,32 +264,22 @@ public class HttpNioFutureCommandConnectionPool extends
} }
public void fatalIOException(IOException ex, NHttpConnection conn) { public void fatalIOException(IOException ex, NHttpConnection conn) {
exception.set(ex);
logger.error(ex, "%3s-%1d{%2s} - io error", conn, conn.hashCode(), logger.error(ex, "%3s-%1d{%2s} - io error", conn, conn.hashCode(),
target); target);
if (!futureCommandConnectionRetry resubmitCommand(conn);
.shutdownConnectionAndRetryOperation(conn))
try {
conn.shutdown();
} catch (IOException e) {
logger.error(e,
"%3s-%1d{%2s} - error shutting down connection", conn,
conn.hashCode(), target);
}
} }
public void fatalProtocolException(HttpException ex, NHttpConnection conn) { public void fatalProtocolException(HttpException ex, NHttpConnection conn) {
exception.set(ex);
logger.error(ex, "%3s-%1d{%2s} - http error", conn, conn.hashCode(), logger.error(ex, "%3s-%1d{%2s} - http error", conn, conn.hashCode(),
target); target);
fatalException(ex, conn); setExceptionOnCommand(conn, ex);
} }
public static interface FutureCommandConnectionHandleFactory public static interface FutureCommandConnectionHandleFactory
extends extends
FutureCommandConnectionPool.FutureCommandConnectionHandleFactory<NHttpConnection> { FutureCommandConnectionPool.FutureCommandConnectionHandleFactory<NHttpConnection, HttpFutureCommand<?>> {
HttpNioFutureCommandConnectionHandle create(FutureCommand command, HttpNioFutureCommandConnectionHandle create(
NHttpConnection conn); HttpFutureCommand<?> command, NHttpConnection conn);
} }
} }

View File

@ -1,67 +0,0 @@
/**
*
* Copyright (C) 2009 Adrian Cole <adriancole@jclouds.org>
*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
package org.jclouds.http.httpnio.pool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.nio.NHttpConnection;
import org.jclouds.command.FutureCommand;
import org.jclouds.command.pool.FutureCommandConnectionHandle;
import org.jclouds.command.pool.FutureCommandConnectionRetry;
import com.google.inject.Inject;
public class HttpNioFutureCommandConnectionRetry extends
FutureCommandConnectionRetry<NHttpConnection> {
@Inject
public HttpNioFutureCommandConnectionRetry(
BlockingQueue<FutureCommand> commandQueue, AtomicInteger errors) {
super(commandQueue, errors);
}
@Override
public void associateHandleWithConnection(
FutureCommandConnectionHandle<NHttpConnection> handle,
NHttpConnection connection) {
connection.getContext().setAttribute("operation-handle", handle);
}
@Override
public HttpNioFutureCommandConnectionHandle getHandleFromConnection(
NHttpConnection connection) {
return (HttpNioFutureCommandConnectionHandle) connection.getContext()
.getAttribute("operation-handle");
}
// @Override
// public void incrementErrorCountAndRetry(FutureCommand operation) {
// ((HttpEntityEnclosingRequest)
// operation.getRequest()).removeHeaders(HTTP.CONTENT_LEN);
// ((HttpEntityEnclosingRequest)
// operation.getRequest()).removeHeaders(HTTP.DATE_HEADER);
// super.incrementErrorCountAndRetry(operation);
// }
}

View File

@ -24,6 +24,7 @@
package org.jclouds.http.httpnio.pool; package org.jclouds.http.httpnio.pool;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -31,12 +32,9 @@ import javax.annotation.Resource;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest; import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.entity.ConsumingNHttpEntity; import org.apache.http.nio.entity.ConsumingNHttpEntity;
import org.apache.http.nio.protocol.NHttpRequestExecutionHandler; import org.apache.http.nio.protocol.NHttpRequestExecutionHandler;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpContext;
import org.jclouds.command.FutureCommand;
import org.jclouds.http.HttpFutureCommand; import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.HttpRequest; import org.jclouds.http.HttpRequest;
import org.jclouds.http.httpnio.HttpNioUtils; import org.jclouds.http.httpnio.HttpNioUtils;
@ -55,7 +53,7 @@ public class HttpNioFutureCommandExecutionHandler implements
@Resource @Resource
protected Logger logger = Logger.NULL; protected Logger logger = Logger.NULL;
private final ConsumingNHttpEntityFactory entityFactory; private final ConsumingNHttpEntityFactory entityFactory;
private final HttpNioFutureCommandConnectionRetry futureOperationRetry; private final BlockingQueue<HttpFutureCommand<?>> commandQueue;
public interface ConsumingNHttpEntityFactory { public interface ConsumingNHttpEntityFactory {
public ConsumingNHttpEntity create(HttpEntity httpEntity); public ConsumingNHttpEntity create(HttpEntity httpEntity);
@ -65,21 +63,20 @@ public class HttpNioFutureCommandExecutionHandler implements
public HttpNioFutureCommandExecutionHandler( public HttpNioFutureCommandExecutionHandler(
ConsumingNHttpEntityFactory entityFactory, ConsumingNHttpEntityFactory entityFactory,
ExecutorService executor, ExecutorService executor,
HttpNioFutureCommandConnectionRetry futureOperationRetry) { BlockingQueue<HttpFutureCommand<?>> commandQueue) {
this.executor = executor; this.executor = executor;
this.entityFactory = entityFactory; this.entityFactory = entityFactory;
this.futureOperationRetry = futureOperationRetry; this.commandQueue = commandQueue;
} }
public void initalizeContext(HttpContext context, Object attachment) { public void initalizeContext(HttpContext context, Object attachment) {
} }
public HttpEntityEnclosingRequest submitRequest(HttpContext context) { public HttpEntityEnclosingRequest submitRequest(HttpContext context) {
HttpFutureCommand operation = (HttpFutureCommand) context HttpFutureCommand<?> command = (HttpFutureCommand<?>) context
.removeAttribute("operation"); .removeAttribute("command");
if (operation != null) { if (command != null) {
// TODO determine why type is lost HttpRequest object = command.getRequest();
HttpRequest object = (HttpRequest) operation.getRequest();
return HttpNioUtils.convertToApacheRequest(object); return HttpNioUtils.convertToApacheRequest(object);
} }
return null; return null;
@ -94,21 +91,19 @@ public class HttpNioFutureCommandExecutionHandler implements
public void handleResponse(HttpResponse response, HttpContext context) public void handleResponse(HttpResponse response, HttpContext context)
throws IOException { throws IOException {
HttpNioFutureCommandConnectionHandle handle = (HttpNioFutureCommandConnectionHandle) context HttpNioFutureCommandConnectionHandle handle = (HttpNioFutureCommandConnectionHandle) context
.removeAttribute("operation-handle"); .removeAttribute("command-handle");
if (handle != null) { if (handle != null) {
try { try {
FutureCommand command = handle.getOperation(); HttpFutureCommand<?> command = handle.getCommand();
int code = response.getStatusLine().getStatusCode(); int code = response.getStatusLine().getStatusCode();
// normal codes for rest commands // normal codes for rest commands
if ((code >= 200 && code < 300) || code == 404) { if ((code >= 200 && code < 300) || code == 404) {
processResponse(response, command); processResponse(response, command);
} else { } else {
if (isRetryable(response)) { if (isRetryable(response)) {
futureOperationRetry commandQueue.add(command);
.shutdownConnectionAndRetryOperation((NHttpClientConnection) context
.getAttribute(ExecutionContext.HTTP_CONNECTION));
} else { } else {
operationFailed(command); commandFailed(command);
} }
} }
} finally { } finally {
@ -116,8 +111,7 @@ public class HttpNioFutureCommandExecutionHandler implements
} }
} else { } else {
throw new IllegalStateException(String.format( throw new IllegalStateException(String.format(
"No operation-handle associated with operation %1s", "No command-handle associated with command %1s", context));
context));
} }
} }
@ -135,14 +129,15 @@ public class HttpNioFutureCommandExecutionHandler implements
} }
} }
protected void operationFailed(FutureCommand command) throws IOException { protected void commandFailed(HttpFutureCommand<?> command)
throws IOException {
String message = String.format("command failed: %1s", command); String message = String.format("command failed: %1s", command);
logger.error(message); logger.error(message);
command.getResponseFuture().setException(new IOException(message)); command.getResponseFuture().setException(new IOException(message));
} }
protected void processResponse(HttpResponse apacheResponse, protected void processResponse(HttpResponse apacheResponse,
FutureCommand command) throws IOException { HttpFutureCommand<?> command) throws IOException {
org.jclouds.http.HttpResponse response = HttpNioUtils org.jclouds.http.HttpResponse response = HttpNioUtils
.convertToJavaCloudsResponse(apacheResponse); .convertToJavaCloudsResponse(apacheResponse);
command.getResponseFuture().setResponse(response); command.getResponseFuture().setResponse(response);
@ -153,7 +148,7 @@ public class HttpNioFutureCommandExecutionHandler implements
public void finalizeContext(HttpContext context) { public void finalizeContext(HttpContext context) {
HttpNioFutureCommandConnectionHandle handle = (HttpNioFutureCommandConnectionHandle) context HttpNioFutureCommandConnectionHandle handle = (HttpNioFutureCommandConnectionHandle) context
.removeAttribute("operation-handle"); .removeAttribute("command-handle");
if (handle != null) { if (handle != null) {
try { try {
handle.cancel(); handle.cancel();

View File

@ -85,6 +85,10 @@ public class JCloudsS3Service extends S3Service {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override @Override
protected Map copyObjectImpl(String sourceBucketName, protected Map copyObjectImpl(String sourceBucketName,
String sourceObjectKey, String destinationBucketName, String sourceObjectKey, String destinationBucketName,
@ -206,26 +210,26 @@ public class JCloudsS3Service extends S3Service {
@Override @Override
protected S3Bucket[] listAllBucketsImpl() throws S3ServiceException { protected S3Bucket[] listAllBucketsImpl() throws S3ServiceException {
try { try {
List<org.jclouds.aws.s3.domain.S3Bucket> jcBucketList = List<org.jclouds.aws.s3.domain.S3Bucket> jcBucketList = connection
connection.getBuckets().get( .getBuckets().get(requestTimeoutMilliseconds,
requestTimeoutMilliseconds, TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
ArrayList<org.jets3t.service.model.S3Bucket> jsBucketList = ArrayList<org.jets3t.service.model.S3Bucket> jsBucketList = new ArrayList<org.jets3t.service.model.S3Bucket>();
new ArrayList<org.jets3t.service.model.S3Bucket>(); for (org.jclouds.aws.s3.domain.S3Bucket jcBucket : jcBucketList) {
for (org.jclouds.aws.s3.domain.S3Bucket jcBucket: jcBucketList) { org.jets3t.service.model.S3Bucket jsBucket = new org.jets3t.service.model.S3Bucket(
org.jets3t.service.model.S3Bucket jsBucket = jcBucket.getName());
new org.jets3t.service.model.S3Bucket(jcBucket.getName()); jsBucket.setOwner(new org.jets3t.service.model.S3Owner(jcBucket
jsBucket.setOwner(new org.jets3t.service.model.S3Owner( .getCanonicalUser().getId(), jcBucket
jcBucket.getCanonicalUser().getId(), .getCanonicalUser().getDisplayName()));
jcBucket.getCanonicalUser().getDisplayName())); jsBucketList.add(jsBucket);
jsBucketList.add(jsBucket);
} }
return (org.jets3t.service.model.S3Bucket[]) jsBucketList.toArray( return (org.jets3t.service.model.S3Bucket[]) jsBucketList
new org.jets3t.service.model.S3Bucket[jsBucketList.size()]); .toArray(new org.jets3t.service.model.S3Bucket[jsBucketList
.size()]);
} catch (Exception e) { } catch (Exception e) {
Utils.<S3ServiceException> rethrowIfRuntimeOrSameType(e); Utils.<S3ServiceException> rethrowIfRuntimeOrSameType(e);
throw new S3ServiceException("error listing buckets", e); throw new S3ServiceException("error listing buckets", e);
} }
} }
@Override @Override

View File

@ -25,12 +25,13 @@ package org.jclouds.aws.s3.nio;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.nio.entity.NStringEntity; import org.apache.http.nio.entity.NStringEntity;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionRetry; import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandExecutionHandler; import org.jclouds.http.httpnio.pool.HttpNioFutureCommandExecutionHandler;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -49,8 +50,8 @@ public class S3HttpNioFutureCommandExecutionHandler extends
public S3HttpNioFutureCommandExecutionHandler( public S3HttpNioFutureCommandExecutionHandler(
ConsumingNHttpEntityFactory entityFactory, ConsumingNHttpEntityFactory entityFactory,
ExecutorService executor, ExecutorService executor,
HttpNioFutureCommandConnectionRetry futureOperationRetry) { BlockingQueue<HttpFutureCommand<?>> commandQueue) {
super(entityFactory, executor, futureOperationRetry); super(entityFactory, executor, commandQueue);
} }
@Override @Override

View File

@ -30,6 +30,7 @@ import static org.easymock.classextension.EasyMock.replay;
import static org.easymock.classextension.EasyMock.verify; import static org.easymock.classextension.EasyMock.verify;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -37,7 +38,7 @@ import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.StatusLine; import org.apache.http.StatusLine;
import org.apache.http.nio.entity.NStringEntity; import org.apache.http.nio.entity.NStringEntity;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandConnectionRetry; import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.httpnio.pool.HttpNioFutureCommandExecutionHandler; import org.jclouds.http.httpnio.pool.HttpNioFutureCommandExecutionHandler;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
@ -58,7 +59,7 @@ public class S3HttpNioFutureCommandExecutionHandlerTest {
handler = new S3HttpNioFutureCommandExecutionHandler( handler = new S3HttpNioFutureCommandExecutionHandler(
createMock(HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class), createMock(HttpNioFutureCommandExecutionHandler.ConsumingNHttpEntityFactory.class),
createMock(ExecutorService.class), createMock(ExecutorService.class),
createMock(HttpNioFutureCommandConnectionRetry.class)); new ArrayBlockingQueue<HttpFutureCommand<?>>(1));
response = createMock(HttpResponse.class); response = createMock(HttpResponse.class);
statusline = createMock(StatusLine.class); statusline = createMock(StatusLine.class);
expect(response.getStatusLine()).andReturn(statusline).atLeastOnce(); expect(response.getStatusLine()).andReturn(statusline).atLeastOnce();

View File

@ -38,8 +38,8 @@ import java.util.List;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.jclouds.command.FutureCommand;
import org.jclouds.http.HttpConstants; import org.jclouds.http.HttpConstants;
import org.jclouds.http.HttpFutureCommand;
import org.jclouds.http.HttpFutureCommandClient; import org.jclouds.http.HttpFutureCommandClient;
import org.jclouds.http.HttpRequest; import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpRequestFilter; import org.jclouds.http.HttpRequestFilter;
@ -83,7 +83,7 @@ public class URLFetchServiceClient implements HttpFutureCommandClient {
this.logger.info("configured to connect to target: %1s", target); this.logger.info("configured to connect to target: %1s", target);
} }
public <O extends FutureCommand> void submit(O operation) { public void submit(HttpFutureCommand<?> operation) {
HttpRequest request = (HttpRequest) operation.getRequest(); HttpRequest request = (HttpRequest) operation.getRequest();
HTTPResponse gaeResponse = null; HTTPResponse gaeResponse = null;
try { try {

View File

@ -26,6 +26,7 @@ package com.amazon.s3;
import java.io.File; import java.io.File;
import java.io.InputStream; import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -47,8 +48,10 @@ public class AmazonPerformance extends BasePerformance {
@Override @Override
@BeforeTest @BeforeTest
@Parameters( { S3Constants.PROPERTY_AWS_ACCESSKEYID, S3Constants.PROPERTY_AWS_SECRETACCESSKEY }) @Parameters( { S3Constants.PROPERTY_AWS_ACCESSKEYID,
protected void setUpClient(@Optional String AWSAccessKeyId, @Optional String AWSSecretAccessKey) throws Exception { S3Constants.PROPERTY_AWS_SECRETACCESSKEY })
protected void setUpClient(@Optional String AWSAccessKeyId,
@Optional String AWSSecretAccessKey) throws Exception {
super.setUpClient(AWSAccessKeyId, AWSSecretAccessKey); super.setUpClient(AWSAccessKeyId, AWSSecretAccessKey);
amzClient = new AWSAuthConnection(AWSAccessKeyId, AWSSecretAccessKey, amzClient = new AWSAuthConnection(AWSAccessKeyId, AWSSecretAccessKey,
false); false);
@ -91,7 +94,7 @@ public class AmazonPerformance extends BasePerformance {
protected boolean putByteArray(String bucket, String key, byte[] data, protected boolean putByteArray(String bucket, String key, byte[] data,
String contentType) throws Exception { String contentType) throws Exception {
com.amazon.s3.S3Object object = new com.amazon.s3.S3Object(data, null); com.amazon.s3.S3Object object = new com.amazon.s3.S3Object(data, null);
Map headers = new TreeMap(); Map<String, List<String>> headers = new TreeMap<String, List<String>>();
headers headers
.put("Content-Type", Arrays .put("Content-Type", Arrays
.asList(new String[] { contentType })); .asList(new String[] { contentType }));

View File

@ -97,6 +97,7 @@ public class S3ParserTest extends org.jclouds.aws.s3.commands.S3ParserTest {
assert completer.take().get(); assert completer.take().get();
} }
@SuppressWarnings("unchecked")
@Test @Test
public void testAmazonCanParseListAllMyBuckets() throws IOException { public void testAmazonCanParseListAllMyBuckets() throws IOException {
ListAllMyBucketsResponse response = runAmazonParseListAllMyBuckets(); ListAllMyBucketsResponse response = runAmazonParseListAllMyBuckets();

View File

@ -29,21 +29,20 @@ import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.DateTimeFormatter;
public class DateService { public class DateService {
private DateTimeFormatter headerDateFormat = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'"); private DateTimeFormatter headerDateFormat = DateTimeFormat
private DateTimeFormatter dataDateFormat = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); .forPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'");
public DateTime dateTimeFromXMLFormat(String toParse) { public DateTime dateTimeFromXMLFormat(String toParse) {
//return dataDateFormat.parseDateTime(toParse); // the format is natively parseable from the DateTime constructor
return new DateTime(toParse); return new DateTime(toParse);
} }
public DateTime dateTimeFromHeaderFormat(String toParse) { public DateTime dateTimeFromHeaderFormat(String toParse) {
return headerDateFormat.parseDateTime(toParse); return headerDateFormat.parseDateTime(toParse);
} }
public String timestampAsHeaderString() { public String timestampAsHeaderString() {
return headerDateFormat.print(new DateTime(DateTimeZone.UTC)); return headerDateFormat.print(new DateTime(DateTimeZone.UTC));
} }
} }

View File

@ -56,11 +56,10 @@ public class CopyObjectCallable extends
} }
throw new HttpException("Error copying source " + reason); throw new HttpException("Error copying source " + reason);
} else if (getResponse().getStatusCode() == 200) { } else if (getResponse().getStatusCode() == 200) {
String response;
InputStream content = getResponse().getContent(); InputStream content = getResponse().getContent();
if (content != null) { if (content != null) {
try { try {
response = Utils.toStringAndClose(content); Utils.toStringAndClose(content);
// TODO parse response of format: <CopyObjectResult // TODO parse response of format: <CopyObjectResult
// xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><LastModified>2009-05-02T18:29:48.000Z</LastModified><ETag>&quot;29f1a7935898965c45f756e5f936fad2&quot;</ETag></CopyObjectResult> // xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><LastModified>2009-05-02T18:29:48.000Z</LastModified><ETag>&quot;29f1a7935898965c45f756e5f936fad2&quot;</ETag></CopyObjectResult>
} catch (IOException e) { } catch (IOException e) {

View File

@ -35,7 +35,6 @@ import org.jclouds.aws.s3.internal.GuiceS3Context;
import org.jclouds.aws.s3.internal.LiveS3Connection; import org.jclouds.aws.s3.internal.LiveS3Connection;
import org.jclouds.aws.s3.internal.LiveS3InputStreamMap; import org.jclouds.aws.s3.internal.LiveS3InputStreamMap;
import org.jclouds.aws.s3.internal.LiveS3ObjectMap; import org.jclouds.aws.s3.internal.LiveS3ObjectMap;
import org.jclouds.aws.s3.internal.GuiceS3Context.S3ObjectMapFactory;
import org.jclouds.http.HttpRequestFilter; import org.jclouds.http.HttpRequestFilter;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;

View File

@ -31,7 +31,6 @@ import org.jclouds.http.HttpRequestFilter;
* @author Adrian Cole * @author Adrian Cole
*/ */
public class RemoveTransferEncodingHeader implements HttpRequestFilter { public class RemoveTransferEncodingHeader implements HttpRequestFilter {
private final static String errorFromAmazonIfYouDontRemove = "<Error><Code>NotImplemented</Code><Message>A header you provided implies functionality that is not implemented</Message><Header>Transfer-Encoding</Header><RequestId>7C59925D75D15561</RequestId><HostId>fbskVU51OZJg2yZS/wNIxoE2PmCf0ZqFd0iH6Vrzw0uKG3KmokswBytL/Bfp/GWb</HostId></Error>";
public void filter(org.jclouds.http.HttpRequest request) public void filter(org.jclouds.http.HttpRequest request)
throws org.jclouds.http.HttpException { throws org.jclouds.http.HttpException {

View File

@ -82,6 +82,7 @@ public class S3IntegrationTest {
} }
} }
String errorFromAmazonIfYouDontRemoveTransferEncodingHeader = "<Error><Code>NotImplemented</Code><Message>A header you provided implies functionality that is not implemented</Message><Header>Transfer-Encoding</Header><RequestId>7C59925D75D15561</RequestId><HostId>fbskVU51OZJg2yZS/wNIxoE2PmCf0ZqFd0iH6Vrzw0uKG3KmokswBytL/Bfp/GWb</HostId></Error>";
String badRequestWhenSourceIsDestBucketOnCopy400 = "<Error><Code>InvalidRequest</Code><Message>The Source and Destination may not be the same when the MetadataDirective is Copy.</Message><RequestId>54C77CAF4D42474B</RequestId><HostId>SJecknEUUUx88/65VAKbCdKSOCkpuVTeu7ZG9in9x9NTNglGnoxdbALCfS4k/DUZ</HostId></Error>"; String badRequestWhenSourceIsDestBucketOnCopy400 = "<Error><Code>InvalidRequest</Code><Message>The Source and Destination may not be the same when the MetadataDirective is Copy.</Message><RequestId>54C77CAF4D42474B</RequestId><HostId>SJecknEUUUx88/65VAKbCdKSOCkpuVTeu7ZG9in9x9NTNglGnoxdbALCfS4k/DUZ</HostId></Error>";
String noSuchSourceKeyOrBucketOnCopy404 = "<Error><Code>NoSuchKey</Code><Message>The specified key does not exist.</Message><Key>null</Key><RequestId>9CCDF1DACA78B36F</RequestId><HostId>63cqk9YsTFBVfBfks840JVGsepPEdQM42mU+r7HN35sF4Nk5xAcWDEUPaQpK2eFU</HostId></Error>"; String noSuchSourceKeyOrBucketOnCopy404 = "<Error><Code>NoSuchKey</Code><Message>The specified key does not exist.</Message><Key>null</Key><RequestId>9CCDF1DACA78B36F</RequestId><HostId>63cqk9YsTFBVfBfks840JVGsepPEdQM42mU+r7HN35sF4Nk5xAcWDEUPaQpK2eFU</HostId></Error>";
String noSuchDestinationBucketOnCopy404 = "<Error><Code>NoSuchBucket</Code><Message>The specified bucket does not exist</Message><BucketName>copydestination</BucketName><RequestId>4F0CF319C5535975</RequestId><HostId>hdZyHOm7VK+JI2UCdye3d6TVkKhRBIoWflldXVDTKbgipYlamy8HgPBzHrUAVQNJ</HostId></Error>"; String noSuchDestinationBucketOnCopy404 = "<Error><Code>NoSuchBucket</Code><Message>The specified bucket does not exist</Message><BucketName>copydestination</BucketName><RequestId>4F0CF319C5535975</RequestId><HostId>hdZyHOm7VK+JI2UCdye3d6TVkKhRBIoWflldXVDTKbgipYlamy8HgPBzHrUAVQNJ</HostId></Error>";

View File

@ -66,6 +66,7 @@ public abstract class BaseGoogleAppEngineTest {
Thread.sleep(7 * 1000); Thread.sleep(7 * 1000);
} }
@SuppressWarnings("deprecation")
@AfterTest @AfterTest
public void stopDevAppServer() throws Exception { public void stopDevAppServer() throws Exception {
server.stop(); server.stop();