Issue 9: better javadoc

git-svn-id: http://jclouds.googlecode.com/svn/trunk@883 3d8758e0-26b5-11de-8745-db77d3ebf521
This commit is contained in:
adrian.f.cole 2009-05-30 10:47:35 +00:00
parent fd3c3841ee
commit d9ff753ac4
5 changed files with 303 additions and 287 deletions

View File

@ -40,103 +40,115 @@ import com.google.inject.Inject;
*
* @author Adrian Cole
*/
public class FutureCommandConnectionPoolClient<C, O extends FutureCommand<?, ?, ?>>
extends BaseLifeCycle implements FutureCommandClient<O> {
private final FutureCommandConnectionPool<C, O> futureCommandConnectionPool;
private final BlockingQueue<O> commandQueue;
public class FutureCommandConnectionPoolClient<C, O extends FutureCommand<?, ?, ?>> extends
BaseLifeCycle implements FutureCommandClient<O> {
private final FutureCommandConnectionPool<C, O> futureCommandConnectionPool;
private final BlockingQueue<O> commandQueue;
@Inject
public FutureCommandConnectionPoolClient(ExecutorService executor,
FutureCommandConnectionPool<C, O> futureCommandConnectionPool,
BlockingQueue<O> commandQueue) {
super(executor, futureCommandConnectionPool);
this.futureCommandConnectionPool = futureCommandConnectionPool;
this.commandQueue = commandQueue;
}
@Inject
public FutureCommandConnectionPoolClient(ExecutorService executor,
FutureCommandConnectionPool<C, O> futureCommandConnectionPool,
BlockingQueue<O> commandQueue) {
super(executor, futureCommandConnectionPool);
this.futureCommandConnectionPool = futureCommandConnectionPool;
this.commandQueue = commandQueue;
}
@Override
protected boolean shouldDoWork() {
return super.shouldDoWork()
&& futureCommandConnectionPool.getStatus()
.equals(Status.ACTIVE);
}
/**
* {@inheritDoc}
* <p/>
* we continue while the connection pool is active
*/
@Override
protected boolean shouldDoWork() {
return super.shouldDoWork() && futureCommandConnectionPool.getStatus().equals(Status.ACTIVE);
}
@Override
protected void doShutdown() {
exception.compareAndSet(null, futureCommandConnectionPool
.getException());
while (!commandQueue.isEmpty()) {
FutureCommand<?, ?, ?> command = (FutureCommand<?, ?, ?>) commandQueue
.remove();
if (command != null) {
if (exception.get() != null)
command.setException(exception.get());
else
command.cancel(true);
}
}
}
/**
* {@inheritDoc}
*
* If the reason we are shutting down is due an exception, we set that exception on all pending
* commands. Otherwise, we cancel the pending commands.
*/
@Override
protected void doShutdown() {
exception.compareAndSet(null, futureCommandConnectionPool.getException());
while (!commandQueue.isEmpty()) {
FutureCommand<?, ?, ?> command = (FutureCommand<?, ?, ?>) commandQueue.remove();
if (command != null) {
if (exception.get() != null)
command.setException(exception.get());
else
command.cancel(true);
}
}
}
@Override
protected void doWork() throws InterruptedException {
O command = commandQueue.poll(1, TimeUnit.SECONDS);
if (command != null) {
try {
invoke(command);
} catch (Exception e) {
Utils.<InterruptedException> rethrowIfRuntimeOrSameType(e);
logger.error(e, "Error processing command %s", command);
}
}
}
@Override
protected void doWork() throws InterruptedException {
takeACommandOffTheQueueAndInvokeIt();
}
public void submit(O command) {
exceptionIfNotActive();
commandQueue.add(command);
}
private void takeACommandOffTheQueueAndInvokeIt() throws InterruptedException {
O command = commandQueue.poll(1, TimeUnit.SECONDS);
if (command != null) {
try {
invoke(command);
} catch (Exception e) {
Utils.<InterruptedException> rethrowIfRuntimeOrSameType(e);
logger.error(e, "Error processing command %s", command);
}
}
}
protected void invoke(O command) {
exceptionIfNotActive();
FutureCommandConnectionHandle<C, O> connectionHandle = null;
try {
connectionHandle = futureCommandConnectionPool.getHandle(command);
} catch (InterruptedException e) {
logger
.warn(
e,
"Interrupted getting a connection for command %1$s; retrying",
command);
commandQueue.add(command);
return;
} catch (TimeoutException e) {
logger.warn(e,
"Timeout getting a connection for command %1$s; retrying",
command);
commandQueue.add(command);
return;
}
/**
* This is an asynchronous operation that puts the <code>command</code> onto a queue. Later, it
* will be processed via the {@link #invoke(FutureCommand) invoke} method.
*/
public void submit(O command) {
exceptionIfNotActive();
commandQueue.add(command);
}
if (connectionHandle == null) {
logger.error(
"Failed to obtain connection for command %1$s; retrying",
command);
commandQueue.add(command);
return;
}
connectionHandle.startConnection();
}
/**
* Invoke binds a command with a connection from the pool. This binding is called a
* {@link FutureCommandConnectionHandle handle}. The handle will keep this binding until the
* command's response is parsed or an exception is set on the Command object.
*
* @param command
*/
protected void invoke(O command) {
exceptionIfNotActive();
FutureCommandConnectionHandle<C, O> connectionHandle = null;
try {
connectionHandle = futureCommandConnectionPool.getHandle(command);
} catch (InterruptedException e) {
logger.warn(e, "Interrupted getting a connection for command %1$s; retrying", command);
commandQueue.add(command);
return;
} catch (TimeoutException e) {
logger.warn(e, "Timeout getting a connection for command %1$s; retrying", command);
commandQueue.add(command);
return;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("FutureCommandConnectionPoolClient");
sb.append("{status=").append(status);
sb.append(", commandQueue=").append(
(commandQueue != null) ? commandQueue.size() : 0);
sb.append(", futureCommandConnectionPool=").append(
futureCommandConnectionPool);
sb.append('}');
return sb.toString();
}
if (connectionHandle == null) {
logger.error("Failed to obtain connection for command %1$s; retrying", command);
commandQueue.add(command);
return;
}
connectionHandle.startConnection();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("FutureCommandConnectionPoolClient");
sb.append("{status=").append(status);
sb.append(", commandQueue=").append((commandQueue != null) ? commandQueue.size() : 0);
sb.append(", futureCommandConnectionPool=").append(futureCommandConnectionPool);
sb.append('}');
return sb.toString();
}
}

View File

@ -28,7 +28,8 @@ import org.jclouds.http.commands.callables.xml.ParseSax;
import com.google.inject.Inject;
/**
* // TODO: Adrian: Document this!
* temporary factory until guice can do multi-type assisted inject
* @see <a href="http://code.google.com/p/google-guice/issues/detail?id=346" />
*
* @author Adrian Cole
*/

View File

@ -38,137 +38,137 @@ import org.jclouds.logging.Logger;
* @author Adrian Cole
*/
public abstract class BaseLifeCycle implements Runnable, LifeCycle {
@Resource
protected Logger logger = Logger.NULL;
protected final ExecutorService executor;
protected final BaseLifeCycle[] dependencies;
protected final Object statusLock;
protected volatile Status status;
protected AtomicReference<Exception> exception = new AtomicReference<Exception>();
@Resource
protected Logger logger = Logger.NULL;
protected final ExecutorService executor;
protected final BaseLifeCycle[] dependencies;
protected final Object statusLock;
protected volatile Status status;
protected AtomicReference<Exception> exception = new AtomicReference<Exception>();
public BaseLifeCycle(ExecutorService executor,
BaseLifeCycle... dependencies) {
this.executor = executor;
this.dependencies = dependencies;
this.statusLock = new Object();
this.status = Status.INACTIVE;
}
public BaseLifeCycle(ExecutorService executor, BaseLifeCycle... dependencies) {
this.executor = executor;
this.dependencies = dependencies;
this.statusLock = new Object();
this.status = Status.INACTIVE;
}
public Status getStatus() {
return status;
}
public Status getStatus() {
return status;
}
public void run() {
try {
while (shouldDoWork()) {
doWork();
}
} catch (Exception e) {
logger.error(e, "Exception doing work");
exception.set(e);
}
this.status = Status.SHUTTING_DOWN;
doShutdown();
this.status = Status.SHUT_DOWN;
logger.info("%1$s", this);
}
public void run() {
try {
while (shouldDoWork()) {
doWork();
}
} catch (Exception e) {
logger.error(e, "Exception doing work");
exception.set(e);
}
this.status = Status.SHUTTING_DOWN;
doShutdown();
this.status = Status.SHUT_DOWN;
logger.info("%1$s", this);
}
protected abstract void doWork() throws Exception;
protected abstract void doWork() throws Exception;
protected abstract void doShutdown();
protected abstract void doShutdown();
protected boolean shouldDoWork() {
try {
exceptionIfDepedenciesNotActive();
} catch (IllegalStateException e) {
return false;
}
return status.equals(Status.ACTIVE) && exception.get() == null;
}
/**
* @return false if any dependencies are inactive, or we are inactive, or we have a global
* exception.
*/
protected boolean shouldDoWork() {
try {
exceptionIfDepedenciesNotActive();
} catch (IllegalStateException e) {
return false;
}
return status.equals(Status.ACTIVE) && exception.get() == null;
}
@PostConstruct
public void start() {
logger.info("starting %1$s", this);
synchronized (this.statusLock) {
if (this.status.compareTo(Status.SHUTDOWN_REQUEST) >= 0) {
doShutdown();
this.status = Status.SHUT_DOWN;
this.statusLock.notifyAll();
return;
}
if (this.status.compareTo(Status.ACTIVE) == 0) {
this.statusLock.notifyAll();
return;
}
@PostConstruct
public void start() {
logger.info("starting %1$s", this);
synchronized (this.statusLock) {
if (this.status.compareTo(Status.SHUTDOWN_REQUEST) >= 0) {
doShutdown();
this.status = Status.SHUT_DOWN;
this.statusLock.notifyAll();
return;
}
if (this.status.compareTo(Status.ACTIVE) == 0) {
this.statusLock.notifyAll();
return;
}
if (this.status.compareTo(Status.INACTIVE) != 0) {
throw new IllegalStateException("Illegal state: " + this.status);
}
if (this.status.compareTo(Status.INACTIVE) != 0) {
throw new IllegalStateException("Illegal state: " + this.status);
}
exceptionIfDepedenciesNotActive();
exceptionIfDepedenciesNotActive();
this.status = Status.ACTIVE;
}
executor.execute(this);
}
this.status = Status.ACTIVE;
}
executor.execute(this);
}
protected void exceptionIfDepedenciesNotActive() {
for (BaseLifeCycle dependency : dependencies) {
if (dependency.status.compareTo(Status.ACTIVE) != 0) {
throw new IllegalStateException(String.format(
"Illegal state: %1$s for component: %2$s",
dependency.status, dependency));
}
}
}
protected void exceptionIfDepedenciesNotActive() {
for (BaseLifeCycle dependency : dependencies) {
if (dependency.status.compareTo(Status.ACTIVE) != 0) {
throw new IllegalStateException(String.format(
"Illegal state: %1$s for component: %2$s", dependency.status, dependency));
}
}
}
public Exception getException() {
return this.exception.get();
}
public Exception getException() {
return this.exception.get();
}
protected void awaitShutdown(long timeout) throws InterruptedException {
awaitStatus(Status.SHUT_DOWN, timeout);
}
protected void awaitShutdown(long timeout) throws InterruptedException {
awaitStatus(Status.SHUT_DOWN, timeout);
}
protected void awaitStatus(Status intended, long timeout)
throws InterruptedException {
synchronized (this.statusLock) {
long deadline = System.currentTimeMillis() + timeout;
long remaining = timeout;
while (this.status != intended) {
this.statusLock.wait(remaining);
if (timeout > 0) {
remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
break;
}
}
}
}
}
protected void awaitStatus(Status intended, long timeout) throws InterruptedException {
synchronized (this.statusLock) {
long deadline = System.currentTimeMillis() + timeout;
long remaining = timeout;
while (this.status != intended) {
this.statusLock.wait(remaining);
if (timeout > 0) {
remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
break;
}
}
}
}
}
@PreDestroy
public void shutdown() {
shutdown(2000);
}
@PreDestroy
public void shutdown() {
shutdown(2000);
}
public void shutdown(long waitMs) {
synchronized (this.statusLock) {
if (this.status.compareTo(Status.ACTIVE) > 0) {
return;
}
this.status = Status.SHUTDOWN_REQUEST;
try {
awaitShutdown(waitMs);
} catch (InterruptedException ignore) {
}
}
}
public void shutdown(long waitMs) {
synchronized (this.statusLock) {
if (this.status.compareTo(Status.ACTIVE) > 0) {
return;
}
this.status = Status.SHUTDOWN_REQUEST;
try {
awaitShutdown(waitMs);
} catch (InterruptedException ignore) {
}
}
}
protected void exceptionIfNotActive() {
if (!status.equals(Status.ACTIVE))
throw new IllegalStateException(String.format("not active: %1$s",
this));
}
protected void exceptionIfNotActive() {
if (!status.equals(Status.ACTIVE))
throw new IllegalStateException(String.format("not active: %1$s", this));
}
}

View File

@ -30,21 +30,21 @@ import java.util.Collections;
import java.util.List;
/**
* // TODO: Adrian: Document this!
*
* This will close objects in the reverse order that they were added.
*
* @author Adrian Cole
*/
public class Closer implements Closeable {
List<Closeable> methodsToClose = new ArrayList<Closeable>();
List<Closeable> methodsToClose = new ArrayList<Closeable>();
public void addToClose(Closeable toClose) {
methodsToClose.add(toClose);
}
public void addToClose(Closeable toClose) {
methodsToClose.add(toClose);
}
public void close() throws IOException {
Collections.reverse(methodsToClose);
for (Closeable toClose : methodsToClose) {
toClose.close();
}
}
public void close() throws IOException {
Collections.reverse(methodsToClose);
for (Closeable toClose : methodsToClose) {
toClose.close();
}
}
}

View File

@ -48,82 +48,85 @@ import com.google.inject.spi.TypeEncounter;
import com.google.inject.spi.TypeListener;
/**
* // TODO: Adrian: Document this!
* This associates java lifecycle annotations with guice hooks. For example, we invoke
* {@link PostConstruct} after injection, and Associate {@link PostDestroy} with a global
* {@link Closer} object.
*
* @author Adrian Cole
*/
public class LifeCycleModule extends AbstractModule {
protected void configure() {
final ExecutorService executor = Executors.newCachedThreadPool();
bind(ExecutorService.class).toInstance(executor);
Closer closer = new Closer();
closer.addToClose(new Closeable() {
public void close() throws IOException {
executor.shutdownNow();
}
});
bind(Closer.class).toInstance(closer);
bindPostInjectionInvoke(closer);
}
protected void configure() {
final ExecutorService executor = Executors.newCachedThreadPool();
bind(ExecutorService.class).toInstance(executor);
Closer closer = new Closer();
closer.addToClose(new Closeable() {
public void close() throws IOException {
executor.shutdownNow();
}
});
bind(Closer.class).toInstance(closer);
bindPostInjectionInvoke(closer);
}
protected void bindPostInjectionInvoke(final Closer closer) {
bindListener(any(), new TypeListener() {
public <I> void hear(TypeLiteral<I> injectableType,
TypeEncounter<I> encounter) {
Set<Method> methods = new HashSet<Method>();
Class<? super I> type = injectableType.getRawType();
while (type != null) {
methods.addAll(Arrays.asList(type.getDeclaredMethods()));
type = type.getSuperclass();
}
for (final Method method : methods) {
PostConstruct postConstruct = method
.getAnnotation(PostConstruct.class);
if (postConstruct != null) {
encounter.register(new InjectionListener<I>() {
public void afterInjection(I injectee) {
try {
method.invoke(injectee);
} catch (InvocationTargetException ie) {
Throwable e = ie.getTargetException();
throw new ProvisionException(
e.getMessage(), e);
} catch (IllegalAccessException e) {
throw new ProvisionException(
e.getMessage(), e);
}
}
});
}
protected void bindPostInjectionInvoke(final Closer closer) {
bindListener(any(), new TypeListener() {
public <I> void hear(TypeLiteral<I> injectableType, TypeEncounter<I> encounter) {
Set<Method> methods = new HashSet<Method>();
Class<? super I> type = injectableType.getRawType();
while (type != null) {
methods.addAll(Arrays.asList(type.getDeclaredMethods()));
type = type.getSuperclass();
}
for (final Method method : methods) {
invokePostConstructMethodAfterInjection(encounter, method);
associatePreDestroyWithCloser(closer, encounter, method);
}
}
PreDestroy preDestroy = method
.getAnnotation(PreDestroy.class);
if (preDestroy != null) {
encounter.register(new InjectionListener<I>() {
public void afterInjection(final I injectee) {
closer.addToClose(new Closeable() {
public void close() throws IOException {
try {
method.invoke(injectee);
} catch (InvocationTargetException ie) {
Throwable e = ie
.getTargetException();
throw new IOException(e
.getMessage());
} catch (IllegalAccessException e) {
throw new IOException(e
.getMessage());
}
}
});
private <I> void associatePreDestroyWithCloser(final Closer closer,
TypeEncounter<I> encounter, final Method method) {
PreDestroy preDestroy = method.getAnnotation(PreDestroy.class);
if (preDestroy != null) {
encounter.register(new InjectionListener<I>() {
public void afterInjection(final I injectee) {
closer.addToClose(new Closeable() {
public void close() throws IOException {
try {
method.invoke(injectee);
} catch (InvocationTargetException ie) {
Throwable e = ie.getTargetException();
throw new IOException(e.getMessage());
} catch (IllegalAccessException e) {
throw new IOException(e.getMessage());
}
}
});
}
});
}
}
}
});
}
}
});
}
}
private <I> void invokePostConstructMethodAfterInjection(TypeEncounter<I> encounter,
final Method method) {
PostConstruct postConstruct = method.getAnnotation(PostConstruct.class);
if (postConstruct != null) {
encounter.register(new InjectionListener<I>() {
public void afterInjection(I injectee) {
try {
method.invoke(injectee);
} catch (InvocationTargetException ie) {
Throwable e = ie.getTargetException();
throw new ProvisionException(e.getMessage(), e);
} catch (IllegalAccessException e) {
throw new ProvisionException(e.getMessage(), e);
}
}
});
}
}
});
}
}