issue #1089 synchronous commands no longer spawn threads unless a timeout is specified

This commit is contained in:
adriancole 2013-01-29 04:08:42 -08:00
parent 42ab9c21fb
commit 878313582a
27 changed files with 1050 additions and 531 deletions

View File

@ -0,0 +1,45 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds;
import com.google.common.annotations.Beta;
import com.google.common.util.concurrent.FutureFallback;
/**
* Provides a backup value to replace an earlier exception.
*
* @param <V>
* the result type of the backup value
*
* @author Adrian Cole
* @see FutureFallback
* @since 1.6
*/
@Beta
public interface Fallback<V> extends FutureFallback<V> {
/**
* The exception is provided so that the {@code Fallback} implementation can
* conditionally determine whether to propagate the exception or to attempt
* to recover.
*
* @param t
* the exception that made the call fail.
*/
V createOrPropagate(Throwable t) throws Exception;
}

View File

@ -40,7 +40,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureFallback;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
/** /**
@ -51,123 +50,153 @@ public final class Fallbacks {
private Fallbacks() { private Fallbacks() {
} }
public static final class NullOnNotFoundOr404 implements FutureFallback<Object> { public static final class NullOnNotFoundOr404 implements Fallback<Object> {
public ListenableFuture<Object> create(Throwable t) throws Exception {
@Override return immediateFuture(createOrPropagate(t));
public ListenableFuture<Object> create(Throwable t) {
return valOnNotFoundOr404(null, checkNotNull(t, "throwable"));
} }
} public Object createOrPropagate(Throwable t) throws Exception {
public static final class VoidOnNotFoundOr404 implements FutureFallback<Void> {
@Override
public ListenableFuture<Void> create(Throwable t) {
return valOnNotFoundOr404(null, checkNotNull(t, "throwable")); return valOnNotFoundOr404(null, checkNotNull(t, "throwable"));
} }
} }
public static final class TrueOnNotFoundOr404 implements FutureFallback<Boolean> { public static final class VoidOnNotFoundOr404 implements Fallback<Void> {
public ListenableFuture<Void> create(Throwable t) throws Exception {
return immediateFuture(createOrPropagate(t));
}
@Override public Void createOrPropagate(Throwable t) throws Exception {
public ListenableFuture<Boolean> create(Throwable t) { return valOnNotFoundOr404(null, checkNotNull(t, "throwable"));
}
}
public static final class TrueOnNotFoundOr404 implements Fallback<Boolean> {
public ListenableFuture<Boolean> create(Throwable t) throws Exception {
return immediateFuture(createOrPropagate(t));
}
public Boolean createOrPropagate(Throwable t) throws Exception {
return valOnNotFoundOr404(true, checkNotNull(t, "throwable")); return valOnNotFoundOr404(true, checkNotNull(t, "throwable"));
} }
} }
public static final class FalseOnNotFoundOr404 implements FutureFallback<Boolean> { public static final class FalseOnNotFoundOr404 implements Fallback<Boolean> {
public ListenableFuture<Boolean> create(Throwable t) throws Exception {
return immediateFuture(createOrPropagate(t));
}
@Override public Boolean createOrPropagate(Throwable t) throws Exception {
public ListenableFuture<Boolean> create(Throwable t) {
return valOnNotFoundOr404(false, checkNotNull(t, "throwable")); return valOnNotFoundOr404(false, checkNotNull(t, "throwable"));
} }
} }
public static final class FalseOnNotFoundOr422 implements FutureFallback<Boolean> { public static final class FalseOnNotFoundOr422 implements Fallback<Boolean> {
public ListenableFuture<Boolean> create(Throwable t) throws Exception {
@Override return immediateFuture(createOrPropagate(t));
public ListenableFuture<Boolean> create(Throwable t) {
if (containsResourceNotFoundException(checkNotNull(t, "throwable"))
|| returnValueOnCodeOrNull(t, true, equalTo(422)) != null)
return immediateFuture(false);
throw propagate(t);
} }
public Boolean createOrPropagate(Throwable t) throws Exception {
if (containsResourceNotFoundException(checkNotNull(t, "throwable"))
|| returnValueOnCodeOrNull(t, true, equalTo(422)) != null)
return false;
throw propagate(t);
}
} }
/** /**
* @author Leander Beernaert * @author Leander Beernaert
*/ */
public static final class AbsentOn403Or404Or500 implements FutureFallback<Optional<Object>> { public static final class AbsentOn403Or404Or500 implements Fallback<Optional<Object>> {
@Override public ListenableFuture<Optional<Object>> create(Throwable t) throws Exception {
public ListenableFuture<Optional<Object>> create(Throwable t) { return immediateFuture(createOrPropagate(t));
Boolean returnVal = returnValueOnCodeOrNull(checkNotNull(t, "throwable"), true, in(asList(403, 404, 500)));
if (returnVal != null)
return immediateFuture(Optional.absent());
throw propagate(t);
} }
public Optional<Object> createOrPropagate(Throwable t) throws Exception {
Boolean returnVal = returnValueOnCodeOrNull(checkNotNull(t, "throwable"), true, in(asList(403, 404, 500)));
if (returnVal != null)
return Optional.absent();
throw propagate(t);
}
} }
public static final class EmptyFluentIterableOnNotFoundOr404 implements FutureFallback<FluentIterable<Object>> { public static final class EmptyFluentIterableOnNotFoundOr404 implements Fallback<FluentIterable<Object>> {
@Override public ListenableFuture<FluentIterable<Object>> create(Throwable t) throws Exception {
public ListenableFuture<FluentIterable<Object>> create(Throwable t) { return immediateFuture(createOrPropagate(t));
}
public FluentIterable<Object> createOrPropagate(Throwable t) throws Exception {
return valOnNotFoundOr404(FluentIterable.from(ImmutableSet.of()), checkNotNull(t, "throwable")); return valOnNotFoundOr404(FluentIterable.from(ImmutableSet.of()), checkNotNull(t, "throwable"));
} }
} }
public static final class EmptyIterableWithMarkerOnNotFoundOr404 implements public static final class EmptyIterableWithMarkerOnNotFoundOr404 implements Fallback<IterableWithMarker<Object>> {
FutureFallback<IterableWithMarker<Object>> { public ListenableFuture<IterableWithMarker<Object>> create(Throwable t) throws Exception {
@Override return immediateFuture(createOrPropagate(t));
public ListenableFuture<IterableWithMarker<Object>> create(Throwable t) { }
public IterableWithMarker<Object> createOrPropagate(Throwable t) throws Exception {
return valOnNotFoundOr404(IterableWithMarkers.from(ImmutableSet.of()), checkNotNull(t, "throwable")); return valOnNotFoundOr404(IterableWithMarkers.from(ImmutableSet.of()), checkNotNull(t, "throwable"));
} }
} }
public static final class EmptyPagedIterableOnNotFoundOr404 implements FutureFallback<PagedIterable<Object>> { public static final class EmptyPagedIterableOnNotFoundOr404 implements Fallback<PagedIterable<Object>> {
@Override public ListenableFuture<PagedIterable<Object>> create(Throwable t) throws Exception {
public ListenableFuture<PagedIterable<Object>> create(Throwable t) { return immediateFuture(createOrPropagate(t));
}
public PagedIterable<Object> createOrPropagate(Throwable t) throws Exception {
return valOnNotFoundOr404(PagedIterables.of(IterableWithMarkers.from(ImmutableSet.of())), return valOnNotFoundOr404(PagedIterables.of(IterableWithMarkers.from(ImmutableSet.of())),
checkNotNull(t, "throwable")); checkNotNull(t, "throwable"));
} }
} }
public static final class EmptyListOnNotFoundOr404 implements FutureFallback<ImmutableList<Object>> { // NO_UCD (unused code) public static final class EmptyListOnNotFoundOr404 implements Fallback<ImmutableList<Object>> { // NO_UCD
@Override // (unused
public ListenableFuture<ImmutableList<Object>> create(Throwable t) { // code)
public ListenableFuture<ImmutableList<Object>> create(Throwable t) throws Exception {
return immediateFuture(createOrPropagate(t));
}
public ImmutableList<Object> createOrPropagate(Throwable t) throws Exception {
return valOnNotFoundOr404(ImmutableList.of(), checkNotNull(t, "throwable")); return valOnNotFoundOr404(ImmutableList.of(), checkNotNull(t, "throwable"));
} }
} }
public static final class EmptySetOnNotFoundOr404 implements FutureFallback<ImmutableSet<Object>> { public static final class EmptySetOnNotFoundOr404 implements Fallback<ImmutableSet<Object>> {
@Override public ListenableFuture<ImmutableSet<Object>> create(Throwable t) throws Exception {
public ListenableFuture<ImmutableSet<Object>> create(Throwable t) { return immediateFuture(createOrPropagate(t));
}
public ImmutableSet<Object> createOrPropagate(Throwable t) throws Exception {
return valOnNotFoundOr404(ImmutableSet.of(), checkNotNull(t, "throwable")); return valOnNotFoundOr404(ImmutableSet.of(), checkNotNull(t, "throwable"));
} }
} }
public static final class EmptyMapOnNotFoundOr404 implements FutureFallback<ImmutableMap<Object, Object>> { public static final class EmptyMapOnNotFoundOr404 implements Fallback<ImmutableMap<Object, Object>> {
@Override public ListenableFuture<ImmutableMap<Object, Object>> create(Throwable t) throws Exception {
public ListenableFuture<ImmutableMap<Object, Object>> create(Throwable t) { return immediateFuture(createOrPropagate(t));
}
public ImmutableMap<Object, Object> createOrPropagate(Throwable t) throws Exception {
return valOnNotFoundOr404(ImmutableMap.of(), checkNotNull(t, "throwable")); return valOnNotFoundOr404(ImmutableMap.of(), checkNotNull(t, "throwable"));
} }
} }
public static final class EmptyMultimapOnNotFoundOr404 implements FutureFallback<ImmutableMultimap<Object, Object>> { // NO_UCD (unused code) public static final class EmptyMultimapOnNotFoundOr404 implements Fallback<ImmutableMultimap<Object, Object>> { // NO_UCD
@Override // (unused
public ListenableFuture<ImmutableMultimap<Object, Object>> create(Throwable t) { // code)
public ListenableFuture<ImmutableMultimap<Object, Object>> create(Throwable t) throws Exception {
return immediateFuture(createOrPropagate(t));
}
public ImmutableMultimap<Object, Object> createOrPropagate(Throwable t) throws Exception {
return valOnNotFoundOr404(ImmutableMultimap.of(), checkNotNull(t, "throwable")); return valOnNotFoundOr404(ImmutableMultimap.of(), checkNotNull(t, "throwable"));
} }
} }
public static <T> ListenableFuture<T> valOnNotFoundOr404(T val, Throwable t) { public static <T> T valOnNotFoundOr404(T val, Throwable t) {
if (containsResourceNotFoundException(checkNotNull(t, "throwable")) || contains404(t)) if (containsResourceNotFoundException(checkNotNull(t, "throwable")) || contains404(t))
return immediateFuture(val); return val;
throw propagate(t); throw propagate(t);
} }

View File

@ -38,7 +38,9 @@ import org.jclouds.lifecycle.Closer;
import org.jclouds.logging.Logger; import org.jclouds.logging.Logger;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.TimeLimiter;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Provides; import com.google.inject.Provides;
@ -98,6 +100,12 @@ public class ExecutorServiceModule extends AbstractModule {
protected void configure() { // NO_UCD protected void configure() { // NO_UCD
} }
@Provides
@Singleton
TimeLimiter timeLimiter(@Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor){
return new SimpleTimeLimiter(userExecutor);
}
@Provides @Provides
@Singleton @Singleton
@Named(PROPERTY_USER_THREADS) @Named(PROPERTY_USER_THREADS)

View File

@ -20,15 +20,16 @@ package org.jclouds.fallbacks;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate; import static com.google.common.base.Throwables.propagate;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
import org.jclouds.Fallback;
import org.jclouds.http.HttpResponseException; import org.jclouds.http.HttpResponseException;
import org.jclouds.rest.AuthorizationException; import org.jclouds.rest.AuthorizationException;
import org.jclouds.rest.ResourceNotFoundException; import org.jclouds.rest.ResourceNotFoundException;
import com.google.common.util.concurrent.FutureFallback;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
/** /**
@ -36,7 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
* @author Adrian Cole * @author Adrian Cole
*/ */
@Singleton @Singleton
public final class MapHttp4xxCodesToExceptions implements FutureFallback<Object> { public final class MapHttp4xxCodesToExceptions implements Fallback<Object> {
private final PropagateIfRetryAfter propagateIfRetryAfter; private final PropagateIfRetryAfter propagateIfRetryAfter;
@ -46,7 +47,12 @@ public final class MapHttp4xxCodesToExceptions implements FutureFallback<Object>
} }
@Override @Override
public ListenableFuture<Object> create(Throwable t) { // NO_UCD public ListenableFuture<Object> create(Throwable t) throws Exception { // NO_UCD
return immediateFuture(createOrPropagate(t));
}
@Override
public Object createOrPropagate(Throwable t) throws Exception {
propagateIfRetryAfter.create(t); // if we pass here, we aren't a retry-after exception propagateIfRetryAfter.create(t); // if we pass here, we aren't a retry-after exception
if (t instanceof HttpResponseException) { if (t instanceof HttpResponseException) {
HttpResponseException responseException = HttpResponseException.class.cast(t); HttpResponseException responseException = HttpResponseException.class.cast(t);

View File

@ -27,15 +27,21 @@ import com.google.common.util.concurrent.ListenableFuture;
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
public interface HttpCommandExecutorService { public interface HttpCommandExecutorService {
/** /**
* Asks the command to build a request relevant for an endpoint that produces responses of * Returns a potentially deferred {@code HttpResponse} from a server responding to the
* generic type {@code HttpResponse}. and invokes it on the endpoint, returning a future * {@code command}. The output {@code ListenableFuture} need not be
* {@linkplain Future#isDone done}, making {@code HttpCommandExecutorService}
* suitable for asynchronous derivations.
* *
* @param command
* that generates requests
* @return {@link Future} containing the response from the {@code endpoint}
*/ */
ListenableFuture<HttpResponse> submit(HttpCommand command); ListenableFuture<HttpResponse> submit(HttpCommand command);
/**
* Returns a {@code HttpResponse} from the server which responded to the
* {@code command}.
*/
HttpResponse invoke(HttpCommand command);
} }

View File

@ -19,10 +19,12 @@
package org.jclouds.http.internal; package org.jclouds.http.internal;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.io.ByteStreams.copy; import static com.google.common.io.ByteStreams.copy;
import static com.google.common.io.ByteStreams.nullOutputStream; import static com.google.common.io.ByteStreams.nullOutputStream;
import static org.jclouds.http.HttpUtils.checkRequestHasContentLengthOrChunkedEncoding; import static org.jclouds.http.HttpUtils.checkRequestHasContentLengthOrChunkedEncoding;
import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled; import static org.jclouds.http.HttpUtils.wirePayloadIfEnabled;
import static org.jclouds.util.Throwables2.getFirstThrowableOfType;
import java.io.FilterInputStream; import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
@ -46,7 +48,6 @@ import org.jclouds.http.handlers.DelegatingErrorHandler;
import org.jclouds.http.handlers.DelegatingRetryHandler; import org.jclouds.http.handlers.DelegatingRetryHandler;
import org.jclouds.io.ContentMetadataCodec; import org.jclouds.io.ContentMetadataCodec;
import org.jclouds.logging.Logger; import org.jclouds.logging.Logger;
import org.jclouds.util.Throwables2;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
@ -58,11 +59,11 @@ import com.google.common.util.concurrent.ListeningExecutorService;
public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandExecutorService { public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandExecutorService {
protected final HttpUtils utils; protected final HttpUtils utils;
protected final ContentMetadataCodec contentMetadataCodec; protected final ContentMetadataCodec contentMetadataCodec;
private final DelegatingRetryHandler retryHandler; protected final DelegatingRetryHandler retryHandler;
private final IOExceptionRetryHandler ioRetryHandler; protected final IOExceptionRetryHandler ioRetryHandler;
private final DelegatingErrorHandler errorHandler; protected final DelegatingErrorHandler errorHandler;
private final ListeningExecutorService ioExecutor; protected final ListeningExecutorService ioExecutor;
@Resource @Resource
protected Logger logger = Logger.NULL; protected Logger logger = Logger.NULL;
@ -74,9 +75,9 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
@Inject @Inject
protected BaseHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec, protected BaseHttpCommandExecutorService(HttpUtils utils, ContentMetadataCodec contentMetadataCodec,
@Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor, @Named(Constants.PROPERTY_IO_WORKER_THREADS) ListeningExecutorService ioExecutor,
DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler, DelegatingRetryHandler retryHandler, IOExceptionRetryHandler ioRetryHandler,
DelegatingErrorHandler errorHandler, HttpWire wire) { DelegatingErrorHandler errorHandler, HttpWire wire) {
this.utils = checkNotNull(utils, "utils"); this.utils = checkNotNull(utils, "utils");
this.contentMetadataCodec = checkNotNull(contentMetadataCodec, "contentMetadataCodec"); this.contentMetadataCodec = checkNotNull(contentMetadataCodec, "contentMetadataCodec");
this.retryHandler = checkNotNull(retryHandler, "retryHandler"); this.retryHandler = checkNotNull(retryHandler, "retryHandler");
@ -123,12 +124,71 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
} }
@Override
public HttpResponse invoke(HttpCommand command) {
HttpResponse response = null;
for (;;) {
HttpRequest request = command.getCurrentRequest();
Q nativeRequest = null;
try {
for (HttpRequestFilter filter : request.getFilters()) {
request = filter.filter(request);
}
checkRequestHasContentLengthOrChunkedEncoding(request,
"After filtering, the request has neither chunked encoding nor content length: " + request);
logger.debug("Sending request %s: %s", request.hashCode(), request.getRequestLine());
wirePayloadIfEnabled(wire, request);
utils.logRequest(headerLog, request, ">>");
nativeRequest = convert(request);
response = invoke(nativeRequest);
logger.debug("Receiving response %s: %s", request.hashCode(), response.getStatusLine());
utils.logResponse(headerLog, response, "<<");
if (response.getPayload() != null && wire.enabled())
wire.input(response);
nativeRequest = null; // response took ownership of streams
int statusCode = response.getStatusCode();
if (statusCode >= 300) {
if (shouldContinue(command, response))
continue;
else
break;
} else {
break;
}
} catch (Exception e) {
IOException ioe = getFirstThrowableOfType(e, IOException.class);
if (ioe != null && ioRetryHandler.shouldRetryRequest(command, ioe)) {
continue;
}
command.setException(new HttpResponseException(e.getMessage() + " connecting to "
+ command.getCurrentRequest().getRequestLine(), command, null, e));
break;
} finally {
cleanup(nativeRequest);
}
}
if (command.getException() != null)
throw propagate(command.getException());
return response;
}
private boolean shouldContinue(HttpCommand command, HttpResponse response) {
boolean shouldContinue = false;
if (retryHandler.shouldRetryRequest(command, response)) {
shouldContinue = true;
} else {
errorHandler.handleError(command, response);
}
return shouldContinue;
}
@Override @Override
public ListenableFuture<HttpResponse> submit(HttpCommand command) { public ListenableFuture<HttpResponse> submit(HttpCommand command) {
HttpRequest request = command.getCurrentRequest(); HttpRequest request = command.getCurrentRequest();
checkRequestHasContentLengthOrChunkedEncoding(request, checkRequestHasContentLengthOrChunkedEncoding(request,
"if the request has a payload, it must be set to chunked encoding or specify a content length: " "if the request has a payload, it must be set to chunked encoding or specify a content length: " + request);
+ request);
return ioExecutor.submit(new HttpResponseCallable(command)); return ioExecutor.submit(new HttpResponseCallable(command));
} }
@ -140,63 +200,12 @@ public abstract class BaseHttpCommandExecutorService<Q> implements HttpCommandEx
} }
public HttpResponse call() throws Exception { public HttpResponse call() throws Exception {
try {
HttpResponse response = null; return invoke(command);
for (;;) { } finally {
HttpRequest request = command.getCurrentRequest(); if (command.getException() != null)
Q nativeRequest = null; throw command.getException();
try {
for (HttpRequestFilter filter : request.getFilters()) {
request = filter.filter(request);
}
checkRequestHasContentLengthOrChunkedEncoding(request,
"After filtering, the request has neither chunked encoding nor content length: " + request);
logger.debug("Sending request %s: %s", request.hashCode(), request.getRequestLine());
wirePayloadIfEnabled(wire, request);
utils.logRequest(headerLog, request, ">>");
nativeRequest = convert(request);
response = invoke(nativeRequest);
logger.debug("Receiving response %s: %s", request.hashCode(), response.getStatusLine());
utils.logResponse(headerLog, response, "<<");
if (response.getPayload() != null && wire.enabled())
wire.input(response);
nativeRequest = null; // response took ownership of streams
int statusCode = response.getStatusCode();
if (statusCode >= 300) {
if (shouldContinue(response))
continue;
else
break;
} else {
break;
}
} catch (Exception e) {
IOException ioe = Throwables2.getFirstThrowableOfType(e, IOException.class);
if (ioe != null && ioRetryHandler.shouldRetryRequest(command, ioe)) {
continue;
}
command.setException(new HttpResponseException(e.getMessage() + " connecting to "
+ command.getCurrentRequest().getRequestLine(), command, null, e));
break;
} finally {
cleanup(nativeRequest);
}
} }
if (command.getException() != null)
throw command.getException();
return response;
}
private boolean shouldContinue(HttpResponse response) {
boolean shouldContinue = false;
if (retryHandler.shouldRetryRequest(command, response)) {
shouldContinue = true;
} else {
errorHandler.handleError(command, response);
}
return shouldContinue;
} }
@Override @Override

View File

@ -19,7 +19,6 @@
package org.jclouds.rest; package org.jclouds.rest;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Collections2.filter;
import java.util.List; import java.util.List;
@ -29,7 +28,6 @@ import org.jclouds.predicates.Validator;
import org.jclouds.reflect.Invocation; import org.jclouds.reflect.Invocation;
import org.jclouds.rest.annotations.ParamValidators; import org.jclouds.rest.annotations.ParamValidators;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.reflect.Parameter; import com.google.common.reflect.Parameter;

View File

@ -24,10 +24,9 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.Target; import java.lang.annotation.Target;
import com.google.common.util.concurrent.FutureFallback;
/** /**
* Annotates the appropriate {@link FutureFallback} which propagates the exception or returns a valid fallback value. * Annotates the appropriate {@link org.jclouds.Fallback} which propagates
* the exception or returns a valid fallback value.
* *
* @since 1.6 * @since 1.6
* @author Adrian Cole * @author Adrian Cole
@ -35,5 +34,5 @@ import com.google.common.util.concurrent.FutureFallback;
@Target(METHOD) @Target(METHOD)
@Retention(RUNTIME) @Retention(RUNTIME)
public @interface Fallback { public @interface Fallback {
Class<? extends FutureFallback<?>> value(); Class<? extends org.jclouds.Fallback<?>> value();
} }

View File

@ -0,0 +1,55 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy current the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.rest.config;
import org.jclouds.Fallback;
import org.jclouds.reflect.Invocation;
import com.google.common.annotations.Beta;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.google.inject.ImplementedBy;
/**
* Provides the ability to decouple timeouts and fallbacks from what's built-in.
*
* @author Adrian Cole
*/
@Beta
@ImplementedBy(ReadAnnotationsAndProperties.class)
public interface InvocationConfig {
/**
* If this is present, Sync method calls will block up to the specified nanos
* and throw an {@linkplain UncheckedTimeoutException}. If this is not
* present, Sync method calls will be invoked directly, typically through
* {@linkplain HttpCommandExecutorService#invoke}.
*/
Optional<Long> getTimeoutNanos(Invocation in);
/**
* command named used in logging and configuration keys.
*/
String getCommandName(Invocation invocation);
/**
* fallback used for Sync or Async commands.
*/
Fallback<?> getFallback(Invocation invocation);
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy current the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.rest.config;
import static com.google.common.base.Optional.fromNullable;
import static com.google.common.collect.Maps.transformValues;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.jclouds.Constants.PROPERTY_TIMEOUTS_PREFIX;
import static org.jclouds.util.Maps2.transformKeys;
import static org.jclouds.util.Predicates2.startsWith;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.reflect.Invocation;
import org.jclouds.rest.annotations.Fallback;
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.reflect.Invokable;
import com.google.inject.Injector;
@Beta
@Singleton
public class ReadAnnotationsAndProperties implements InvocationConfig {
private final Injector injector;
private final org.jclouds.Fallback<Object> defaultFallback;
private final Map<String, Long> timeouts;
@Inject
ReadAnnotationsAndProperties(Injector injector,
Function<Predicate<String>, Map<String, String>> filterStringsBoundByName,
org.jclouds.Fallback<Object> defaultFallback) {
this.injector = injector;
this.defaultFallback = defaultFallback;
this.timeouts = timeouts(filterStringsBoundByName);
}
@Override
public Optional<Long> getTimeoutNanos(Invocation in) {
String commandName = getCommandName(in);
Optional<Long> defaultMillis = fromNullable(timeouts.get("default"));
Optional<Long> timeoutMillis = fromNullable(timeouts.get(commandName));
Invokable<?, ?> invoked = in.getInvokable();
if (invoked.isAnnotationPresent(Named.class)) {
timeoutMillis = timeoutMillis.or(defaultMillis);
} else {
// TODO: remove old logic once Named annotations are on all methods
String className = invoked.getOwnerType().getRawType().getSimpleName().replace("AsyncClient", "Client")
.replace("AsyncApi", "Api");
timeoutMillis = timeoutMillis.or(fromNullable(timeouts.get(className))).or(defaultMillis);
}
if (timeoutMillis.isPresent())
return Optional.of(MILLISECONDS.toNanos(timeoutMillis.get()));
return Optional.absent();
}
@Override
public String getCommandName(Invocation invocation) {
Invokable<?, ?> invoked = invocation.getInvokable();
if (invoked.isAnnotationPresent(Named.class)) {
return invoked.getAnnotation(Named.class).value();
} else {
// TODO: remove old logic once Named annotations are on all methods
String className = invoked.getOwnerType().getRawType().getSimpleName().replace("AsyncClient", "Client")
.replace("AsyncApi", "Api");
return className + "." + invoked.getName();
}
}
@Override
public org.jclouds.Fallback<?> getFallback(Invocation invocation) {
Fallback fallback = invocation.getInvokable().getAnnotation(Fallback.class);
if (fallback != null) {
return injector.getInstance(fallback.value());
}
return defaultFallback;
}
/**
* override timeout by values configured in properties(in ms)
*/
static Map<String, Long> timeouts(Function<Predicate<String>, Map<String, String>> filterStringsBoundByName) {
Map<String, String> stringBoundWithTimeoutPrefix = filterStringsBoundByName
.apply(startsWith(PROPERTY_TIMEOUTS_PREFIX));
Map<String, Long> longsByName = transformValues(stringBoundWithTimeoutPrefix, new Function<String, Long>() {
public Long apply(String input) {
return Long.valueOf(String.valueOf(input));
}
});
return transformKeys(longsByName, new Function<String, String>() {
public String apply(String input) {
return input.replaceFirst(PROPERTY_TIMEOUTS_PREFIX, "");
}
});
}
}

View File

@ -22,34 +22,34 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.toArray; import static com.google.common.collect.Iterables.toArray;
import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Maps.transformValues;
import static com.google.common.util.concurrent.Atomics.newReference; import static com.google.common.util.concurrent.Atomics.newReference;
import static org.jclouds.Constants.PROPERTY_TIMEOUTS_PREFIX;
import static org.jclouds.reflect.Reflection2.method; import static org.jclouds.reflect.Reflection2.method;
import static org.jclouds.reflect.Reflection2.methods; import static org.jclouds.reflect.Reflection2.methods;
import static org.jclouds.rest.config.BinderUtils.bindHttpApi; import static org.jclouds.rest.config.BinderUtils.bindHttpApi;
import static org.jclouds.util.Maps2.transformKeys;
import static org.jclouds.util.Predicates2.startsWith;
import java.net.Proxy; import java.net.Proxy;
import java.net.URI; import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Named;
import javax.inject.Singleton; import javax.inject.Singleton;
import org.jclouds.fallbacks.MapHttp4xxCodesToExceptions;
import org.jclouds.functions.IdentityFunction; import org.jclouds.functions.IdentityFunction;
import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpResponse;
import org.jclouds.http.functions.config.SaxParserModule; import org.jclouds.http.functions.config.SaxParserModule;
import org.jclouds.internal.FilterStringsBoundToInjectorByName; import org.jclouds.internal.FilterStringsBoundToInjectorByName;
import org.jclouds.json.config.GsonModule; import org.jclouds.json.config.GsonModule;
import org.jclouds.location.config.LocationModule; import org.jclouds.location.config.LocationModule;
import org.jclouds.proxy.ProxyForURI; import org.jclouds.proxy.ProxyForURI;
import org.jclouds.reflect.Invocation;
import org.jclouds.rest.AuthorizationException; import org.jclouds.rest.AuthorizationException;
import org.jclouds.rest.HttpAsyncClient; import org.jclouds.rest.HttpAsyncClient;
import org.jclouds.rest.HttpClient; import org.jclouds.rest.HttpClient;
import org.jclouds.rest.binders.BindToJsonPayloadWrappedWith; import org.jclouds.rest.binders.BindToJsonPayloadWrappedWith;
import org.jclouds.rest.internal.BlockOnFuture; import org.jclouds.rest.internal.RestAnnotationProcessor;
import org.jclouds.rest.internal.TransformerForRequest;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function; import com.google.common.base.Function;
@ -90,6 +90,21 @@ public class RestModule extends AbstractModule {
return seedKnownSync2AsyncInvokables(sync2Async); return seedKnownSync2AsyncInvokables(sync2Async);
} }
/**
* function view of above
*/
@Provides
@Singleton
protected Function<Invocation, Invocation> sync2async(final Cache<Invokable<?, ?>, Invokable<?, ?>> cache) {
return new Function<Invocation, Invocation>() {
public Invocation apply(Invocation in) {
return Invocation.create(
checkNotNull(cache.getIfPresent(in.getInvokable()), "invokable %s not in %s", in.getInvokable(),
cache), in.getArgs());
}
};
}
@VisibleForTesting @VisibleForTesting
static Cache<Invokable<?, ?>, Invokable<?, ?>> seedKnownSync2AsyncInvokables(Map<Class<?>, Class<?>> sync2Async) { static Cache<Invokable<?, ?>, Invokable<?, ?>> seedKnownSync2AsyncInvokables(Map<Class<?>, Class<?>> sync2Async) {
Cache<Invokable<?, ?>, Invokable<?, ?>> sync2AsyncBuilder = CacheBuilder.newBuilder().build(); Cache<Invokable<?, ?>, Invokable<?, ?>> sync2AsyncBuilder = CacheBuilder.newBuilder().build();
@ -133,7 +148,12 @@ public class RestModule extends AbstractModule {
install(new GsonModule()); install(new GsonModule());
install(new SetCaller.Module()); install(new SetCaller.Module());
install(new FactoryModuleBuilder().build(BindToJsonPayloadWrappedWith.Factory.class)); install(new FactoryModuleBuilder().build(BindToJsonPayloadWrappedWith.Factory.class));
install(new FactoryModuleBuilder().build(BlockOnFuture.Factory.class)); bind(new TypeLiteral<Function<HttpRequest, Function<HttpResponse, ?>>>() {
}).to(TransformerForRequest.class);
bind(new TypeLiteral<org.jclouds.Fallback<Object>>() {
}).to(MapHttp4xxCodesToExceptions.class);
bind(new TypeLiteral<Function<Invocation, HttpRequest>>() {
}).to(RestAnnotationProcessor.class);
bind(IdentityFunction.class).toInstance(IdentityFunction.INSTANCE); bind(IdentityFunction.class).toInstance(IdentityFunction.INSTANCE);
bindHttpApi(binder(), HttpClient.class, HttpAsyncClient.class); bindHttpApi(binder(), HttpClient.class, HttpAsyncClient.class);
// this will help short circuit scenarios that can otherwise lock out users // this will help short circuit scenarios that can otherwise lock out users
@ -145,23 +165,4 @@ public class RestModule extends AbstractModule {
}).to(ProxyForURI.class); }).to(ProxyForURI.class);
installLocations(); installLocations();
} }
@Provides
@Singleton
@Named("TIMEOUTS")
protected Map<String, Long> timeouts(Function<Predicate<String>, Map<String, String>> filterStringsBoundByName) {
Map<String, String> stringBoundWithTimeoutPrefix = filterStringsBoundByName
.apply(startsWith(PROPERTY_TIMEOUTS_PREFIX));
Map<String, Long> longsByName = transformValues(stringBoundWithTimeoutPrefix, new Function<String, Long>() {
public Long apply(String input) {
return Long.valueOf(String.valueOf(input));
}
});
return transformKeys(longsByName, new Function<String, String>() {
public String apply(String input) {
return input.replaceFirst(PROPERTY_TIMEOUTS_PREFIX, "");
}
});
}
} }

View File

@ -1,132 +0,0 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.rest.internal;
import static com.google.common.base.Optional.fromNullable;
import static com.google.common.collect.ObjectArrays.concat;
import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.logging.Logger;
import org.jclouds.reflect.Invocation;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.reflect.Invokable;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.google.inject.assistedinject.Assisted;
public class BlockOnFuture implements Function<ListenableFuture<?>, Object> {
public static interface Factory {
/**
* @param invocation
* context for how the future was created
*/
BlockOnFuture create(Invocation invocation);
}
@Resource
private Logger logger = Logger.NULL;
private final Map<String, Long> timeouts;
private final Invocation invocation;
@Inject
@VisibleForTesting
BlockOnFuture(@Named("TIMEOUTS") Map<String, Long> timeouts, @Assisted Invocation invocation) {
this.timeouts = timeouts;
this.invocation = invocation;
}
@Override
public Object apply(ListenableFuture<?> future) {
Optional<Long> timeoutNanos = timeoutInNanos(invocation.getInvokable(), timeouts);
return block(future, timeoutNanos);
}
private Object block(ListenableFuture<?> future, Optional<Long> timeoutNanos) {
try {
if (timeoutNanos.isPresent()) {
logger.debug(">> blocking on %s for %s", future, timeoutNanos);
return getUninterruptibly(future, timeoutNanos.get(), NANOSECONDS);
} else {
logger.debug(">> blocking on %s", future);
return getUninterruptibly(future);
}
} catch (ExecutionException e) {
throw propagateCause(e);
} catch (TimeoutException e) {
future.cancel(true);
throw new UncheckedTimeoutException(e);
}
}
private static RuntimeException propagateCause(Exception e) {
Throwable cause = e.getCause();
if (cause == null) {
UncheckedExecutionException unchecked = new UncheckedExecutionException(e.getMessage()) {
private static final long serialVersionUID = 1L;
};
unchecked.setStackTrace(e.getStackTrace());
throw unchecked;
}
StackTraceElement[] combined = concat(cause.getStackTrace(), e.getStackTrace(), StackTraceElement.class);
cause.setStackTrace(combined);
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
// The cause is a weird kind of Throwable, so throw the outer exception.
throw new RuntimeException(e);
}
// override timeout by values configured in properties(in ms)
private Optional<Long> timeoutInNanos(Invokable<?, ?> invoked, Map<String, Long> timeouts) {
Optional<Long> defaultMillis = fromNullable(timeouts.get("default"));
Optional<Long> timeoutMillis;
if (invoked.isAnnotationPresent(Named.class)) {
String commandName = invoked.getAnnotation(Named.class).value();
timeoutMillis = fromNullable(timeouts.get(commandName)).or(defaultMillis);
} else {
// TODO: remove old logic, once Named annotations are present on all methods
String className = invoked.getOwnerType().getRawType().getSimpleName().replace("AsyncClient", "Client")
.replace("AsyncApi", "Api");
timeoutMillis = fromNullable(timeouts.get(className + "." + invoked.getName())).or(
fromNullable(timeouts.get(className))).or(defaultMillis);
}
if (timeoutMillis.isPresent())
return Optional.of(TimeUnit.MILLISECONDS.toNanos(timeoutMillis.get()));
return Optional.absent();
}
}

View File

@ -18,7 +18,6 @@
*/ */
package org.jclouds.rest.internal; package org.jclouds.rest.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate; import static com.google.common.base.Throwables.propagate;
import static com.google.common.util.concurrent.Futures.getUnchecked; import static com.google.common.util.concurrent.Futures.getUnchecked;
@ -32,7 +31,6 @@ import org.jclouds.reflect.Invocation;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.reflect.Invokable; import com.google.common.reflect.Invokable;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -45,7 +43,7 @@ public final class InvokeAndCallGetOnFutures<R> implements Function<Invocation,
@Resource @Resource
private Logger logger = Logger.NULL; private Logger logger = Logger.NULL;
private final Cache<Invokable<?, ?>, Invokable<?, ?>> sync2AsyncInvokables; private final Function<Invocation, Invocation> sync2async;
private final R receiver; private final R receiver;
/** /**
@ -55,8 +53,8 @@ public final class InvokeAndCallGetOnFutures<R> implements Function<Invocation,
*/ */
@Inject @Inject
@VisibleForTesting @VisibleForTesting
InvokeAndCallGetOnFutures(Cache<Invokable<?, ?>, Invokable<?, ?>> sync2AsyncInvokables, R receiver) { InvokeAndCallGetOnFutures(Function<Invocation, Invocation> sync2async, R receiver) {
this.sync2AsyncInvokables = sync2AsyncInvokables; this.sync2async = sync2async;
this.receiver = receiver; this.receiver = receiver;
} }
@ -64,8 +62,7 @@ public final class InvokeAndCallGetOnFutures<R> implements Function<Invocation,
@Override @Override
public Object apply(Invocation in) { public Object apply(Invocation in) {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
Invokable target = checkNotNull(sync2AsyncInvokables.getIfPresent(in.getInvokable()), "invokable %s not in %s", Invokable target = sync2async.apply(in).getInvokable();
in.getInvokable(), sync2AsyncInvokables);
Object returnVal; Object returnVal;
try { try {
returnVal = target.invoke(receiver, in.getArgs().toArray()); returnVal = target.invoke(receiver, in.getArgs().toArray());

View File

@ -19,114 +19,240 @@
package org.jclouds.rest.internal; package org.jclouds.rest.internal;
import static com.google.common.base.Objects.equal; import static com.google.common.base.Objects.equal;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Objects.toStringHelper;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.propagate;
import static com.google.common.util.concurrent.Futures.transform; import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.Futures.withFallback; import static com.google.common.util.concurrent.Futures.withFallback;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.jclouds.Constants.PROPERTY_USER_THREADS;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;
import org.jclouds.Constants;
import org.jclouds.fallbacks.MapHttp4xxCodesToExceptions;
import org.jclouds.http.HttpCommand; import org.jclouds.http.HttpCommand;
import org.jclouds.http.HttpCommandExecutorService; import org.jclouds.http.HttpCommandExecutorService;
import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpResponse; import org.jclouds.http.HttpResponse;
import org.jclouds.logging.Logger; import org.jclouds.logging.Logger;
import org.jclouds.reflect.Invocation; import org.jclouds.reflect.Invocation;
import org.jclouds.rest.InvocationContext; import org.jclouds.rest.InvocationContext;
import org.jclouds.rest.annotations.Fallback; import org.jclouds.rest.config.InvocationConfig;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.cache.Cache; import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.reflect.Invokable; import com.google.common.reflect.Invokable;
import com.google.common.util.concurrent.FutureFallback;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Injector; import com.google.common.util.concurrent.TimeLimiter;
import com.google.common.util.concurrent.UncheckedTimeoutException;
/**
* @author Adrian Cole
*/
public class InvokeHttpMethod implements Function<Invocation, Object> { public class InvokeHttpMethod implements Function<Invocation, Object> {
@Resource @Resource
private Logger logger = Logger.NULL; private Logger logger = Logger.NULL;
private final Injector injector; private final Function<Invocation, Invocation> sync2async;
private final Cache<Invokable<?, ?>, Invokable<?, ?>> sync2AsyncInvokables; private final Function<Invocation, HttpRequest> annotationProcessor;
private final RestAnnotationProcessor annotationProcessor;
private final HttpCommandExecutorService http; private final HttpCommandExecutorService http;
private final TransformerForRequest transformerForRequest;
private final ListeningExecutorService userExecutor; private final ListeningExecutorService userExecutor;
private final BlockOnFuture.Factory blocker; private final TimeLimiter timeLimiter;
private final Function<HttpRequest, Function<HttpResponse, ?>> transformerForRequest;
private final InvocationConfig config;
@Inject @Inject
private InvokeHttpMethod(Injector injector, Cache<Invokable<?, ?>, Invokable<?, ?>> sync2AsyncInvokables, @VisibleForTesting
RestAnnotationProcessor annotationProcessor, HttpCommandExecutorService http, InvokeHttpMethod(Function<Invocation, Invocation> sync2async, Function<Invocation, HttpRequest> annotationProcessor,
TransformerForRequest transformerForRequest, HttpCommandExecutorService http, Function<HttpRequest, Function<HttpResponse, ?>> transformerForRequest,
@Named(Constants.PROPERTY_USER_THREADS) ListeningExecutorService userExecutor, BlockOnFuture.Factory blocker) { TimeLimiter timeLimiter, InvocationConfig config,
this.injector = injector; @Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor) {
this.sync2AsyncInvokables = sync2AsyncInvokables; this.sync2async = sync2async;
this.annotationProcessor = annotationProcessor; this.annotationProcessor = annotationProcessor;
this.http = http; this.http = http;
this.userExecutor = userExecutor; this.userExecutor = userExecutor;
this.blocker = blocker; this.timeLimiter = timeLimiter;
this.transformerForRequest = transformerForRequest; this.transformerForRequest = transformerForRequest;
this.config = config;
} }
private final LoadingCache<Invokable<?, ?>, FutureFallback<?>> fallbacks = CacheBuilder.newBuilder().build(
new CacheLoader<Invokable<?, ?>, FutureFallback<?>>() {
@Override
public FutureFallback<?> load(Invokable<?, ?> key) throws Exception {
Fallback annotation = key.getAnnotation(Fallback.class);
if (annotation != null) {
return injector.getInstance(annotation.value());
}
return injector.getInstance(MapHttp4xxCodesToExceptions.class);
}
});
@Override @Override
public Object apply(Invocation in) { public Object apply(Invocation in) {
if (isFuture(in.getInvokable())) { if (isFuture(in.getInvokable())) {
return createFuture(in); return submit(in);
} }
Invocation async = Invocation.create( Invocation async = toAsync(in);
checkNotNull(sync2AsyncInvokables.getIfPresent(in.getInvokable()), "invokable %s not in %s", Optional<Long> timeoutNanos = config.getTimeoutNanos(async);
in.getInvokable(), sync2AsyncInvokables), in.getArgs()); if (timeoutNanos.isPresent()) {
return invokeWithTimeout(async, timeoutNanos.get());
}
return invoke(async);
}
/**
* submits the {@linkplain HttpCommand} associated with {@code invocation},
* {@link #getTransformer(String, HttpCommand) parses its response}, and
* applies a {@link #getFallback(String, Invocation, HttpCommand) fallback}
* if a {@code Throwable} is encountered. Parsing and Fallback occur on the
* {@code userExecutor} thread.
*/
public ListenableFuture<?> submit(Invocation invocation) {
String commandName = config.getCommandName(invocation);
HttpCommand command = toCommand(commandName, invocation);
Function<HttpResponse, ?> transformer = getTransformer(commandName, command);
org.jclouds.Fallback<?> fallback = getFallback(commandName, invocation, command);
logger.debug(">> submitting %s", commandName);
return withFallback(transform(http.submit(command), transformer, userExecutor), fallback);
}
/**
* invokes the {@linkplain HttpCommand} associated with {@code invocation},
* {@link #getTransformer(String, HttpCommand) parses its response}, and
* applies a {@link #getFallback(String, Invocation, HttpCommand) fallback}
* if a {@code Throwable} is encountered.
*/
public Object invoke(Invocation invocation) {
String commandName = config.getCommandName(invocation);
HttpCommand command = toCommand(commandName, invocation);
Function<HttpResponse, ?> transformer = getTransformer(commandName, command);
org.jclouds.Fallback<?> fallback = getFallback(commandName, invocation, command);
logger.debug(">> invoking %s", commandName);
try {
return transformer.apply(http.invoke(command));
} catch (Throwable t) {
try {
return fallback.createOrPropagate(t);
} catch (Exception e) {
throw propagate(e);
}
}
}
/**
* calls {@link #invoke(Invocation)}, timing out after the specified time
* limit. If the target method call finished before the limit is reached, the
* return value or exception is propagated to the caller exactly as-is. If,
* on the other hand, the time limit is reached, we attempt to abort the call
* to the target, and throw an {@link UncheckedTimeoutException} to the
* caller.
*
* @param invocation
* the Invocation to invoke via {@link #invoke(Invocation)}
* @param limitNanos
* with timeoutUnit, the maximum length of time to wait in
* nanoseconds
* @throws InterruptedException
* if our thread is interrupted during execution
* @throws UncheckedTimeoutException
* if the time limit is reached
* @see TimeLimiter#callWithTimeout(Callable, long, TimeUnit, boolean)
*/
public Object invokeWithTimeout(final Invocation invocation, final long limitNanos) {
String commandName = config.getCommandName(invocation);
HttpCommand command = toCommand(commandName, invocation);
org.jclouds.Fallback<?> fallback = getFallback(commandName, invocation, command);
logger.debug(">> blocking on %s for %s", invocation, limitNanos);
try {
return timeLimiter
.callWithTimeout(new InvokeAndTransform(commandName, command), limitNanos, NANOSECONDS, true);
} catch (Throwable t) {
try {
return fallback.createOrPropagate(t);
} catch (Exception e) {
throw propagate(e);
}
}
}
private org.jclouds.Fallback<?> getFallback(String commandName, Invocation invocation, HttpCommand command) {
HttpRequest request = command.getCurrentRequest();
org.jclouds.Fallback<?> fallback = config.getFallback(invocation);
if (fallback instanceof InvocationContext)
InvocationContext.class.cast(fallback).setContext(request);
logger.trace("<< exceptions from %s are parsed by %s", commandName, fallback.getClass().getSimpleName());
return fallback;
}
@VisibleForTesting
final class InvokeAndTransform implements Callable<Object> {
private final String commandName;
private final HttpCommand command;
private final Function<HttpResponse, ?> transformer;
InvokeAndTransform(String commandName, HttpCommand command) {
this.commandName = commandName;
this.command = command;
this.transformer = getTransformer(commandName, command);
}
@Override
public Object call() throws Exception {
return transformer.apply(http.invoke(command));
}
@Override
public int hashCode() {
return Objects.hashCode(commandName, command, transformer);
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null || getClass() != obj.getClass())
return false;
InvokeAndTransform that = InvokeAndTransform.class.cast(obj);
return equal(this.commandName, that.commandName) && equal(this.command, that.command)
&& equal(this.transformer, that.transformer);
}
@Override
public String toString() {
return toStringHelper(this).add("commandName", commandName).add("command", command)
.add("transformer", transformer).toString();
}
}
/**
* looks up the corresponding {@code Invocation} that returns a
* {@code Future}. Only Invokables that return {@code Futures} are annotated
* in a way that can be parsed into an {@linkplain HttpRequest}.
*/
private Invocation toAsync(Invocation in) {
Invocation async = sync2async.apply(in);
checkState(isFuture(async.getInvokable()), "not a future: %s", async); checkState(isFuture(async.getInvokable()), "not a future: %s", async);
return blocker.create(async).apply(createFuture(async)); return async;
}
private HttpCommand toCommand(String commandName, Invocation invocation) {
logger.trace(">> converting %s", commandName);
HttpRequest request = annotationProcessor.apply(invocation);
logger.trace("<< converted %s to %s", commandName, request.getRequestLine());
return new HttpCommand(request);
}
private Function<HttpResponse, ?> getTransformer(String commandName, HttpCommand command) {
HttpRequest request = command.getCurrentRequest();
Function<HttpResponse, ?> transformer = transformerForRequest.apply(request);
logger.trace("<< response from %s is parsed by %s", commandName, transformer.getClass().getSimpleName());
return transformer;
} }
private boolean isFuture(Invokable<?, ?> in) { private boolean isFuture(Invokable<?, ?> in) {
return in.getReturnType().getRawType().equals(ListenableFuture.class); return in.getReturnType().getRawType().equals(ListenableFuture.class);
} }
public ListenableFuture<?> createFuture(Invocation invocation) {
String name = invocation.getInvokable().toString();
logger.trace(">> converting %s", name);
GeneratedHttpRequest request = annotationProcessor.apply(invocation);
logger.trace("<< converted %s to %s", name, request.getRequestLine());
Function<HttpResponse, ?> transformer = transformerForRequest.apply(request);
logger.trace("<< response from %s is parsed by %s", name, transformer.getClass().getSimpleName());
logger.debug(">> invoking %s", name);
ListenableFuture<?> result = transform(http.submit(new HttpCommand(request)), transformer, userExecutor);
FutureFallback<?> fallback = fallbacks.getUnchecked(invocation.getInvokable());
if (fallback instanceof InvocationContext) {
InvocationContext.class.cast(fallback).setContext(request);
}
logger.trace("<< exceptions from %s are parsed by %s", name, fallback.getClass().getSimpleName());
return withFallback(result, fallback);
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) if (this == o)

View File

@ -131,7 +131,7 @@ import com.google.inject.TypeLiteral;
* *
* @author Adrian Cole * @author Adrian Cole
*/ */
public class RestAnnotationProcessor implements Function<Invocation, GeneratedHttpRequest> { public class RestAnnotationProcessor implements Function<Invocation, HttpRequest> {
@Resource @Resource
protected Logger logger = Logger.NULL; protected Logger logger = Logger.NULL;

View File

@ -35,6 +35,7 @@ import javax.lang.model.type.NullType;
import org.jclouds.functions.IdentityFunction; import org.jclouds.functions.IdentityFunction;
import org.jclouds.functions.OnlyElementOrNull; import org.jclouds.functions.OnlyElementOrNull;
import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpResponse; import org.jclouds.http.HttpResponse;
import org.jclouds.http.functions.ParseFirstJsonValueNamed; import org.jclouds.http.functions.ParseFirstJsonValueNamed;
import org.jclouds.http.functions.ParseJson; import org.jclouds.http.functions.ParseJson;
@ -69,7 +70,7 @@ import com.google.inject.Injector;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
public class TransformerForRequest implements Function<GeneratedHttpRequest, Function<HttpResponse, ?>> { public class TransformerForRequest implements Function<HttpRequest, Function<HttpResponse, ?>> {
private final ParseSax.Factory parserFactory; private final ParseSax.Factory parserFactory;
private final Injector injector; private final Injector injector;
private final GetAcceptHeaders getAcceptHeaders; private final GetAcceptHeaders getAcceptHeaders;
@ -83,7 +84,8 @@ public class TransformerForRequest implements Function<GeneratedHttpRequest, Fun
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public Function<HttpResponse, ?> apply(GeneratedHttpRequest request) { public Function<HttpResponse, ?> apply(HttpRequest in) {
GeneratedHttpRequest request = GeneratedHttpRequest.class.cast(in);
Function<HttpResponse, ?> transformer; Function<HttpResponse, ?> transformer;
Class<? extends HandlerWithResult<?>> handler = getSaxResponseParserClassOrNull(request.getInvocation() Class<? extends HandlerWithResult<?>> handler = getSaxResponseParserClassOrNull(request.getInvocation()
.getInvokable()); .getInvokable());
@ -93,7 +95,7 @@ public class TransformerForRequest implements Function<GeneratedHttpRequest, Fun
transformer = getTransformerForMethod(request.getInvocation(), injector); transformer = getTransformerForMethod(request.getInvocation(), injector);
} }
if (transformer instanceof InvocationContext<?>) { if (transformer instanceof InvocationContext<?>) {
((InvocationContext<?>) transformer).setContext(request); InvocationContext.class.cast(transformer).setContext(request);
} }
if (request.getInvocation().getInvokable().isAnnotationPresent(Transform.class)) { if (request.getInvocation().getInvokable().isAnnotationPresent(Transform.class)) {
Function<?, ?> wrappingTransformer = injector.getInstance(request.getInvocation().getInvokable() Function<?, ?> wrappingTransformer = injector.getInstance(request.getInvocation().getInvokable()

View File

@ -1,3 +1,21 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy current the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.fallbacks; package org.jclouds.fallbacks;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;

View File

@ -36,27 +36,27 @@ import com.google.common.net.HttpHeaders;
public class MapHttp4xxCodesToExceptionsTest { public class MapHttp4xxCodesToExceptionsTest {
@Test(expectedExceptions = AuthorizationException.class) @Test(expectedExceptions = AuthorizationException.class)
public void test401ToAuthorizationException() { public void test401ToAuthorizationException() throws Exception {
fn.create(new HttpResponseException(command, HttpResponse.builder().statusCode(401).build())); fn.create(new HttpResponseException(command, HttpResponse.builder().statusCode(401).build()));
} }
@Test(expectedExceptions = AuthorizationException.class) @Test(expectedExceptions = AuthorizationException.class)
public void test403ToAuthorizationException() { public void test403ToAuthorizationException() throws Exception {
fn.create(new HttpResponseException(command, HttpResponse.builder().statusCode(403).build())); fn.create(new HttpResponseException(command, HttpResponse.builder().statusCode(403).build()));
} }
@Test(expectedExceptions = ResourceNotFoundException.class) @Test(expectedExceptions = ResourceNotFoundException.class)
public void test404ToResourceNotFoundException() { public void test404ToResourceNotFoundException() throws Exception {
fn.create(new HttpResponseException(command, HttpResponse.builder().statusCode(404).build())); fn.create(new HttpResponseException(command, HttpResponse.builder().statusCode(404).build()));
} }
@Test(expectedExceptions = IllegalStateException.class) @Test(expectedExceptions = IllegalStateException.class)
public void test409ToIllegalStateException() { public void test409ToIllegalStateException() throws Exception {
fn.create(new HttpResponseException(command, HttpResponse.builder().statusCode(409).build())); fn.create(new HttpResponseException(command, HttpResponse.builder().statusCode(409).build()));
} }
@Test(expectedExceptions = RetryAfterException.class, expectedExceptionsMessageRegExp = "retry now") @Test(expectedExceptions = RetryAfterException.class, expectedExceptionsMessageRegExp = "retry now")
public void testHttpResponseExceptionWithRetryAfterDate() { public void testHttpResponseExceptionWithRetryAfterDate() throws Exception {
fn.create(new HttpResponseException(command, fn.create(new HttpResponseException(command,
HttpResponse.builder() HttpResponse.builder()
.statusCode(503) .statusCode(503)
@ -64,7 +64,7 @@ public class MapHttp4xxCodesToExceptionsTest {
} }
@Test(expectedExceptions = RetryAfterException.class, expectedExceptionsMessageRegExp = "retry in 700 seconds") @Test(expectedExceptions = RetryAfterException.class, expectedExceptionsMessageRegExp = "retry in 700 seconds")
public void testHttpResponseExceptionWithRetryAfterOffset(){ public void testHttpResponseExceptionWithRetryAfterOffset() throws Exception {
fn.create(new HttpResponseException(command, fn.create(new HttpResponseException(command,
HttpResponse.builder() HttpResponse.builder()
.statusCode(503) .statusCode(503)
@ -72,7 +72,7 @@ public class MapHttp4xxCodesToExceptionsTest {
} }
@Test(expectedExceptions = RetryAfterException.class, expectedExceptionsMessageRegExp = "retry in 86400 seconds") @Test(expectedExceptions = RetryAfterException.class, expectedExceptionsMessageRegExp = "retry in 86400 seconds")
public void testHttpResponseExceptionWithRetryAfterPastIsZero(){ public void testHttpResponseExceptionWithRetryAfterPastIsZero() throws Exception {
fn.create(new HttpResponseException(command, fn.create(new HttpResponseException(command,
HttpResponse.builder() HttpResponse.builder()
.statusCode(503) .statusCode(503)

View File

@ -52,7 +52,6 @@ import org.jclouds.util.Strings2;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.FutureFallback;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Provides; import com.google.inject.Provides;
@ -92,13 +91,14 @@ public interface IntegrationTestAsyncClient {
@Fallback(FooOnException.class) @Fallback(FooOnException.class)
ListenableFuture<String> downloadException(@PathParam("id") String id, HttpRequestOptions options); ListenableFuture<String> downloadException(@PathParam("id") String id, HttpRequestOptions options);
static class FooOnException implements FutureFallback<String> { static class FooOnException implements org.jclouds.Fallback<String> {
@Override
public ListenableFuture<String> create(Throwable t) throws Exception { public ListenableFuture<String> create(Throwable t) throws Exception {
return immediateFuture("foo"); return immediateFuture("foo");
} }
public String createOrPropagate(Throwable t) throws Exception {
return "foo";
}
} }
@GET @GET

View File

@ -27,6 +27,7 @@ import java.util.Properties;
import org.jclouds.http.BaseJettyTest; import org.jclouds.http.BaseJettyTest;
import org.jclouds.http.HttpCommand; import org.jclouds.http.HttpCommand;
import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpResponse; import org.jclouds.http.HttpResponse;
import org.jclouds.http.IntegrationTestAsyncClient; import org.jclouds.http.IntegrationTestAsyncClient;
import org.jclouds.io.Payloads; import org.jclouds.io.Payloads;
@ -34,6 +35,7 @@ import org.jclouds.reflect.Invocation;
import org.jclouds.rest.internal.RestAnnotationProcessor; import org.jclouds.rest.internal.RestAnnotationProcessor;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.reflect.Invokable; import com.google.common.reflect.Invokable;
@ -122,7 +124,7 @@ public class BackoffLimitedRetryHandlerTest {
assertEquals(response.getPayload().getInput().read(), -1); assertEquals(response.getPayload().getInput().read(), -1);
} }
private final RestAnnotationProcessor processor = BaseJettyTest.newBuilder(8100, new Properties()).buildInjector() private final Function<Invocation, HttpRequest> processor = BaseJettyTest.newBuilder(8100, new Properties()).buildInjector()
.getInstance(RestAnnotationProcessor.class); .getInstance(RestAnnotationProcessor.class);
private HttpCommand createCommand() throws SecurityException, NoSuchMethodException { private HttpCommand createCommand() throws SecurityException, NoSuchMethodException {

View File

@ -24,6 +24,7 @@ import javax.ws.rs.POST;
import javax.ws.rs.PathParam; import javax.ws.rs.PathParam;
import org.jclouds.ContextBuilder; import org.jclouds.ContextBuilder;
import org.jclouds.http.HttpRequest;
import org.jclouds.http.IntegrationTestAsyncClient; import org.jclouds.http.IntegrationTestAsyncClient;
import org.jclouds.http.IntegrationTestClient; import org.jclouds.http.IntegrationTestClient;
import org.jclouds.predicates.validators.AllLowerCaseValidator; import org.jclouds.predicates.validators.AllLowerCaseValidator;
@ -35,6 +36,7 @@ import org.testng.TestException;
import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.reflect.Invokable; import com.google.common.reflect.Invokable;
import com.google.inject.Injector; import com.google.inject.Injector;
@ -105,7 +107,7 @@ public class InputParamValidatorTest {
} }
Injector injector; Injector injector;
RestAnnotationProcessor restAnnotationProcessor; Function<Invocation, HttpRequest> restAnnotationProcessor;
@BeforeClass @BeforeClass
void setupFactory() { void setupFactory() {

View File

@ -0,0 +1,135 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy current the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.rest.config;
import static org.jclouds.reflect.Reflection2.method;
import static org.testng.Assert.assertEquals;
import java.util.Properties;
import javax.inject.Named;
import org.jclouds.Fallbacks.FalseOnNotFoundOr404;
import org.jclouds.Fallbacks.NullOnNotFoundOr404;
import org.jclouds.http.HttpResponse;
import org.jclouds.internal.FilterStringsBoundToInjectorByName;
import org.jclouds.reflect.Invocation;
import org.jclouds.rest.annotations.Fallback;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.name.Names;
/**
*
* @author Adrian Cole
*/
@Test(groups = "unit", singleThreaded = true)
public class ReadAnnotationsAndPropertiesTest {
public static interface ThingApi {
HttpResponse get();
HttpResponse namedGet();
}
public static interface ThingAsyncApi {
ListenableFuture<HttpResponse> get();
@Named("ns:get")
@Fallback(FalseOnNotFoundOr404.class)
ListenableFuture<HttpResponse> namedGet();
}
private Invocation asyncGet;
private Invocation asyncNamedGet;
private org.jclouds.Fallback<Object> defaultFallback;
@BeforeClass
void setupInvocations() throws SecurityException, NoSuchMethodException {
asyncGet = Invocation.create(method(ThingAsyncApi.class, "get"), ImmutableList.of());
asyncNamedGet = Invocation.create(method(ThingAsyncApi.class, "namedGet"), ImmutableList.of());
defaultFallback = new NullOnNotFoundOr404();
}
/**
* this functionality will be removed once Named annotations are on all async
* classes.
*/
public void testInvocationsSetDefaultTimeoutOnAsyncMethods() throws Exception {
final Properties props = new Properties();
props.setProperty("jclouds.timeouts.default", "250");
Injector injector = Guice.createInjector(new AbstractModule() {
protected void configure() {
Names.bindProperties(binder(), props);
}
});
ReadAnnotationsAndProperties config = new ReadAnnotationsAndProperties(injector,
new FilterStringsBoundToInjectorByName(injector), defaultFallback);
assertEquals(config.getTimeoutNanos(asyncGet), Optional.of(250000000l));
assertEquals(config.getTimeoutNanos(asyncNamedGet), Optional.of(250000000l));
}
public void testNamedInvocationGetsTimeoutOverrideOnAsyncMethods() throws Exception {
final Properties props = new Properties();
props.setProperty("jclouds.timeouts.default", "50");
props.setProperty("jclouds.timeouts.ThingApi", "100");
props.setProperty("jclouds.timeouts.ns:get", "250");
Injector injector = Guice.createInjector(new AbstractModule() {
protected void configure() {
Names.bindProperties(binder(), props);
}
});
ReadAnnotationsAndProperties config = new ReadAnnotationsAndProperties(injector,
new FilterStringsBoundToInjectorByName(injector), defaultFallback);
assertEquals(config.getTimeoutNanos(asyncNamedGet), Optional.of(250000000l));
}
/**
* this functionality will be removed once Named annotations are on all async
* classes.
*/
public void testNamingConventionOfUnnamedMethods() throws Exception {
Injector injector = Guice.createInjector();
ReadAnnotationsAndProperties config = new ReadAnnotationsAndProperties(injector,
new FilterStringsBoundToInjectorByName(injector), defaultFallback);
assertEquals(config.getCommandName(asyncGet), "ThingApi.get");
}
public void testNamingConventionOfNamedAsyncMethods() throws Exception {
Injector injector = Guice.createInjector();
ReadAnnotationsAndProperties config = new ReadAnnotationsAndProperties(injector,
new FilterStringsBoundToInjectorByName(injector), defaultFallback);
assertEquals(config.getCommandName(asyncNamedGet), "ns:get");
}
public void testFallbackOverride() throws Exception {
Injector injector = Guice.createInjector();
ReadAnnotationsAndProperties config = new ReadAnnotationsAndProperties(injector,
new FilterStringsBoundToInjectorByName(injector), defaultFallback);
assertEquals(config.getFallback(asyncNamedGet).getClass(), FalseOnNotFoundOr404.class);
assertEquals(config.getFallback(asyncGet), defaultFallback);
}
}

View File

@ -84,12 +84,15 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.common.util.concurrent.TimeLimiter;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
/** /**
@ -236,6 +239,12 @@ public abstract class BaseRestApiExpectTest<S> {
}).toInstance(fn); }).toInstance(fn);
bind(HttpCommandExecutorService.class).to(ExpectHttpCommandExecutorService.class); bind(HttpCommandExecutorService.class).to(ExpectHttpCommandExecutorService.class);
} }
@Provides
@Singleton
TimeLimiter timeLimiter(@Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor){
return new SimpleTimeLimiter(userExecutor);
}
} }
/** /**

View File

@ -33,6 +33,9 @@ import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.Map.Entry; import java.util.Map.Entry;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.concurrent.config.ConfiguresExecutorService; import org.jclouds.concurrent.config.ConfiguresExecutorService;
import org.jclouds.fallbacks.MapHttp4xxCodesToExceptions; import org.jclouds.fallbacks.MapHttp4xxCodesToExceptions;
import org.jclouds.http.HttpCommandExecutorService; import org.jclouds.http.HttpCommandExecutorService;
@ -51,8 +54,11 @@ import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap; import com.google.common.collect.TreeMultimap;
import com.google.common.reflect.Invokable; import com.google.common.reflect.Invokable;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.common.util.concurrent.TimeLimiter;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Provides;
/** /**
* *
@ -83,6 +89,12 @@ public abstract class BaseRestApiTest {
bind(ListeningExecutorService.class).annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).toInstance(sameThreadExecutor()); bind(ListeningExecutorService.class).annotatedWith(named(PROPERTY_IO_WORKER_THREADS)).toInstance(sameThreadExecutor());
bind(HttpCommandExecutorService.class).toInstance(mock); bind(HttpCommandExecutorService.class).toInstance(mock);
} }
@Provides
@Singleton
TimeLimiter timeLimiter(@Named(PROPERTY_USER_THREADS) ListeningExecutorService userExecutor){
return new SimpleTimeLimiter(userExecutor);
}
} }
protected void assertPayloadEquals(HttpRequest request, String toMatch, String contentType, boolean contentMD5) { protected void assertPayloadEquals(HttpRequest request, String toMatch, String contentType, boolean contentMD5) {

View File

@ -1,138 +0,0 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.rest.internal;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.jclouds.reflect.Reflection2.method;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Named;
import org.jclouds.reflect.Invocation;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
/**
*
* @author Adrian Cole
*/
@Test(groups = "unit", singleThreaded = true)
public class BlockOnFutureTest {
static ListenableFuture<String> future;
public static class ThingAsyncApi {
public ListenableFuture<String> get() {
return future;
}
@Named("ns:get")
public ListenableFuture<String> namedGet() {
return future;
}
}
private Invocation get;
private Invocation namedGet;
@BeforeClass
void setupInvocations() throws SecurityException, NoSuchMethodException {
get = Invocation.create(method(ThingAsyncApi.class, "get"), ImmutableList.of());
namedGet = Invocation.create(method(ThingAsyncApi.class, "namedGet"), ImmutableList.of());
}
@SuppressWarnings("unchecked")
@BeforeMethod
void createMockedFuture() throws InterruptedException, ExecutionException, TimeoutException {
future = createMock(ListenableFuture.class);
expect(future.get(250000000, TimeUnit.NANOSECONDS)).andReturn("foo");
replay(future);
}
public void testUnnamedMethodWithDefaultPropTimeout() throws Exception {
Function<ListenableFuture<?>, Object> withOverride = new BlockOnFuture(ImmutableMap.of("default", 250L), get);
assertEquals(withOverride.apply(future), "foo");
verify(future);
}
public void testUnnamedMethodWithClassPropTimeout() throws Exception {
Function<ListenableFuture<?>, Object> withOverride = new BlockOnFuture(ImmutableMap.of("default", 50L,
"ThingApi", 250L), get);
assertEquals(withOverride.apply(future), "foo");
verify(future);
}
public void testUnnamedMethodWithMethodPropTimeout() throws Exception {
Function<ListenableFuture<?>, Object> withOverride = new BlockOnFuture(ImmutableMap.of("default", 50L,
"ThingApi", 100L, "ThingApi.get", 250L), get);
assertEquals(withOverride.apply(future), "foo");
verify(future);
}
@SuppressWarnings("unchecked")
public void testUnnamedMethodWithNoTimeoutsCallGetDirectly() throws Exception {
future = createMock(ListenableFuture.class);
expect(future.get()).andReturn("foo");
replay(future);
Function<ListenableFuture<?>, Object> noOverrides = new BlockOnFuture(ImmutableMap.<String, Long> of(), get);
assertEquals(noOverrides.apply(future), "foo");
verify(future);
}
public void testNamedMethodWithDefaultPropTimeout() throws Exception {
Function<ListenableFuture<?>, Object> withOverride = new BlockOnFuture(ImmutableMap.of("default", 250L), namedGet);
assertEquals(withOverride.apply(future), "foo");
verify(future);
}
public void testNamedMethodWithMethodPropTimeout() throws Exception {
Function<ListenableFuture<?>, Object> withOverride = new BlockOnFuture(ImmutableMap.of("default", 50L,
"ThingApi", 100L, "ns:get", 250L), namedGet);
assertEquals(withOverride.apply(future), "foo");
verify(future);
}
@SuppressWarnings("unchecked")
public void testNamedMethodWithNoTimeoutsCallGetDirectly() throws Exception {
future = createMock(ListenableFuture.class);
expect(future.get()).andReturn("foo");
replay(future);
Function<ListenableFuture<?>, Object> noOverrides = new BlockOnFuture(ImmutableMap.<String, Long> of(), namedGet);
assertEquals(noOverrides.apply(future), "foo");
verify(future);
}
}

View File

@ -0,0 +1,177 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.rest.internal;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.jclouds.reflect.Reflection2.method;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import javax.inject.Named;
import org.jclouds.http.HttpCommand;
import org.jclouds.http.HttpCommandExecutorService;
import org.jclouds.http.HttpRequest;
import org.jclouds.http.HttpResponse;
import org.jclouds.reflect.Invocation;
import org.jclouds.rest.config.InvocationConfig;
import org.jclouds.rest.internal.InvokeHttpMethod.InvokeAndTransform;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.TimeLimiter;
/**
*
* @author Adrian Cole
*/
@Test(groups = "unit", singleThreaded = true)
public class InvokeHttpMethodTest {
public static interface ThingApi {
HttpResponse get();
}
public static interface ThingAsyncApi {
@Named("ns:get")
ListenableFuture<HttpResponse> get();
}
private Invocation get;
private Invocation asyncGet;
private Function<Invocation, Invocation> sync2async;
private HttpRequest getRequest = HttpRequest.builder().method("GET").endpoint("http://get").build();
private HttpCommand getCommand = new HttpCommand(getRequest);
private Function<Invocation, HttpRequest> toRequest;
@BeforeClass
void setupInvocations() throws SecurityException, NoSuchMethodException {
get = Invocation.create(method(ThingApi.class, "get"), ImmutableList.of());
asyncGet = Invocation.create(method(ThingAsyncApi.class, "get"), ImmutableList.of());
sync2async = Functions.forMap(ImmutableMap.of(get, asyncGet));
toRequest = Functions.forMap(ImmutableMap.of(asyncGet, getRequest));
}
@SuppressWarnings("unchecked")
private Function<HttpRequest, Function<HttpResponse, ?>> transformerForRequest = Function.class.cast(Functions
.constant(Functions.identity()));
private ListeningExecutorService userThreads = MoreExecutors.sameThreadExecutor();
private HttpResponse response = HttpResponse.builder().statusCode(200).payload("foo").build();
private HttpCommandExecutorService http;
private TimeLimiter timeLimiter;
@SuppressWarnings("rawtypes")
private org.jclouds.Fallback fallback;
private InvocationConfig config;
private InvokeHttpMethod invokeHttpMethod;
private ListenableFuture<HttpResponse> future;
@SuppressWarnings("unchecked")
@BeforeMethod
void createMocks() {
http = createMock(HttpCommandExecutorService.class);
timeLimiter = createMock(TimeLimiter.class);
fallback = createMock(org.jclouds.Fallback.class);
config = createMock(InvocationConfig.class);
future = createMock(ListenableFuture.class);
invokeHttpMethod = new InvokeHttpMethod(sync2async, toRequest, http, transformerForRequest, timeLimiter, config,
userThreads);
expect(config.getCommandName(asyncGet)).andReturn("ns:get");
expect(config.getFallback(asyncGet)).andReturn(fallback);
}
@AfterMethod
void verifyMocks() {
verify(http, timeLimiter, fallback, config, future);
}
public void testMethodWithTimeoutRunsTimeLimiter() throws Exception {
expect(config.getTimeoutNanos(asyncGet)).andReturn(Optional.of(250000000l));
InvokeAndTransform invoke = invokeHttpMethod.new InvokeAndTransform("ns:get", getCommand);
expect(timeLimiter.callWithTimeout(invoke, 250000000, TimeUnit.NANOSECONDS, true)).andReturn(response);
replay(http, timeLimiter, fallback, config, future);
invokeHttpMethod.apply(get);
}
public void testMethodWithNoTimeoutCallGetDirectly() throws Exception {
expect(config.getTimeoutNanos(asyncGet)).andReturn(Optional.<Long> absent());
expect(http.invoke(new HttpCommand(getRequest))).andReturn(response);
replay(http, timeLimiter, fallback, config, future);
invokeHttpMethod.apply(get);
}
public void testAsyncMethodSubmitsRequest() throws Exception {
expect(http.submit(new HttpCommand(getRequest))).andReturn(future);
future.addListener(anyObject(Runnable.class), eq(userThreads));
replay(http, timeLimiter, fallback, config, future);
invokeHttpMethod.apply(asyncGet);
}
private HttpResponse fallbackResponse = HttpResponse.builder().statusCode(200).payload("bar").build();
public void testDirectCallRunsFallbackCreateOrPropagate() throws Exception {
IllegalStateException exception = new IllegalStateException();
expect(config.getTimeoutNanos(asyncGet)).andReturn(Optional.<Long> absent());
expect(http.invoke(new HttpCommand(getRequest))).andThrow(exception);
expect(fallback.createOrPropagate(exception)).andReturn(fallbackResponse);
replay(http, timeLimiter, fallback, config, future);
assertEquals(invokeHttpMethod.apply(get), fallbackResponse);
}
public void testTimeLimitedRunsFallbackCreateOrPropagate() throws Exception {
IllegalStateException exception = new IllegalStateException();
expect(config.getTimeoutNanos(asyncGet)).andReturn(Optional.of(250000000l));
InvokeAndTransform invoke = invokeHttpMethod.new InvokeAndTransform("ns:get", getCommand);
expect(timeLimiter.callWithTimeout(invoke, 250000000, TimeUnit.NANOSECONDS, true)).andThrow(exception);
expect(fallback.createOrPropagate(exception)).andReturn(fallbackResponse);
replay(http, timeLimiter, fallback, config, future);
assertEquals(invokeHttpMethod.apply(get), fallbackResponse);
}
@SuppressWarnings("unchecked")
public void testSubmitRunsFallbackCreateOnGet() throws Exception {
IllegalStateException exception = new IllegalStateException();
expect(http.submit(new HttpCommand(getRequest))).andReturn(
Futures.<HttpResponse> immediateFailedFuture(exception));
expect(fallback.create(exception)).andReturn(Futures.<HttpResponse> immediateFuture(fallbackResponse));
// not using the field, as you can see above we are making an immediate
// failed future instead.
future = createMock(ListenableFuture.class);
replay(http, timeLimiter, fallback, config, future);
assertEquals(ListenableFuture.class.cast(invokeHttpMethod.apply(asyncGet)).get(), fallbackResponse);
}
}

View File

@ -249,9 +249,14 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
@Override @Override
public ListenableFuture<HttpResponse> submit(HttpCommand command) { public ListenableFuture<HttpResponse> submit(HttpCommand command) {
return Futures.immediateFuture(invoke(command));
}
@Override
public HttpResponse invoke(HttpCommand command) {
assertEquals(command.getCurrentRequest().getRequestLine(), assertEquals(command.getCurrentRequest().getRequestLine(),
"GET http://localhost:9999/client/1/foo HTTP/1.1"); "GET http://localhost:9999/client/1/foo HTTP/1.1");
return Futures.immediateFuture(HttpResponse.builder().build()); return HttpResponse.builder().build();
} }
}); });
@ -274,6 +279,11 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
@Override @Override
public ListenableFuture<HttpResponse> submit(HttpCommand command) { public ListenableFuture<HttpResponse> submit(HttpCommand command) {
return Futures.immediateFuture(invoke(command));
}
@Override
public HttpResponse invoke(HttpCommand command) {
if (callCounter == 1) if (callCounter == 1)
assertEquals(command.getCurrentRequest().getRequestLine(), assertEquals(command.getCurrentRequest().getRequestLine(),
"GET http://localhost:1111/client/1/bar/2 HTTP/1.1"); "GET http://localhost:1111/client/1/bar/2 HTTP/1.1");
@ -281,7 +291,7 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
assertEquals(command.getCurrentRequest().getRequestLine(), assertEquals(command.getCurrentRequest().getRequestLine(),
"GET http://localhost:1111/client/1/foo HTTP/1.1"); "GET http://localhost:1111/client/1/foo HTTP/1.1");
callCounter++; callCounter++;
return Futures.immediateFuture(HttpResponse.builder().build()); return HttpResponse.builder().build();
} }
}); });
@ -304,8 +314,13 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
@Override @Override
public ListenableFuture<HttpResponse> submit(HttpCommand command) { public ListenableFuture<HttpResponse> submit(HttpCommand command) {
return Futures.immediateFuture(invoke(command));
}
@Override
public HttpResponse invoke(HttpCommand command) {
assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1"); assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1");
return Futures.immediateFuture(HttpResponse.builder().build()); return HttpResponse.builder().build();
} }
}); });
@ -329,9 +344,14 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
@Override @Override
public ListenableFuture<HttpResponse> submit(HttpCommand command) { public ListenableFuture<HttpResponse> submit(HttpCommand command) {
return Futures.immediateFuture(invoke(command));
}
@Override
public HttpResponse invoke(HttpCommand command) {
assertEquals(command.getCurrentRequest().getRequestLine(), assertEquals(command.getCurrentRequest().getRequestLine(),
"GET http://howdyboys/testing/testing/thepathparam/client/1/foo HTTP/1.1"); "GET http://howdyboys/testing/testing/thepathparam/client/1/foo HTTP/1.1");
return Futures.immediateFuture(HttpResponse.builder().build()); return HttpResponse.builder().build();
} }
}); });
@ -355,8 +375,13 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
@Override @Override
public ListenableFuture<HttpResponse> submit(HttpCommand command) { public ListenableFuture<HttpResponse> submit(HttpCommand command) {
return Futures.immediateFuture(invoke(command));
}
@Override
public HttpResponse invoke(HttpCommand command) {
assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1"); assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1");
return Futures.immediateFuture(HttpResponse.builder().build()); return HttpResponse.builder().build();
} }
}); });
@ -380,8 +405,13 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
@Override @Override
public ListenableFuture<HttpResponse> submit(HttpCommand command) { public ListenableFuture<HttpResponse> submit(HttpCommand command) {
return Futures.immediateFuture(invoke(command));
}
@Override
public HttpResponse invoke(HttpCommand command) {
assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1"); assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1");
return Futures.immediateFuture(HttpResponse.builder().build()); return HttpResponse.builder().build();
} }
}, new AbstractModule() { }, new AbstractModule() {
@ -419,8 +449,13 @@ public class RestAnnotationProcessorTest extends BaseRestApiTest {
@Override @Override
public ListenableFuture<HttpResponse> submit(HttpCommand command) { public ListenableFuture<HttpResponse> submit(HttpCommand command) {
return Futures.immediateFuture(invoke(command));
}
@Override
public HttpResponse invoke(HttpCommand command) {
assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1"); assertEquals(command.getCurrentRequest().getRequestLine(), "GET http://howdyboys/client/1/foo HTTP/1.1");
return Futures.immediateFuture(HttpResponse.builder().build()); return HttpResponse.builder().build();
} }
}); });