upgrade to guava 10.0.1 and fix assembly

This commit is contained in:
Shay Banon 2011-12-06 21:09:28 +02:00
parent 9781d8675d
commit c1b86f5786
5 changed files with 367 additions and 12 deletions

View File

@ -91,8 +91,14 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>r09</version>
<version>10.0.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -5,13 +5,6 @@
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>org.apache.lucene:lucene*</include>
</includes>
</dependencySet>
<dependencySet>
<outputDirectory>/lib</outputDirectory>
<useTransitiveFiltering>true</useTransitiveFiltering>
<scope>provided</scope>
<includes>
<include>log4j:log4j</include>
<include>jline:jline</include>
<include>net.java.dev.jna:jna</include>

View File

@ -19,13 +19,13 @@
package org.elasticsearch.action.support;
import com.google.common.util.concurrent.AbstractFuture;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.ElasticSearchTimeoutException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import java.util.concurrent.ExecutionException;
@ -35,7 +35,7 @@ import java.util.concurrent.TimeoutException;
/**
*
*/
public abstract class AdapterActionFuture<T, L> extends AbstractFuture<T> implements ActionFuture<T>, ActionListener<L> {
public abstract class AdapterActionFuture<T, L> extends BaseFuture<T> implements ActionFuture<T>, ActionListener<L> {
private Throwable rootFailure;

View File

@ -0,0 +1,356 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import com.google.common.annotations.Beta;
import org.elasticsearch.common.Nullable;
import java.util.concurrent.*;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* An abstract implementation of the {@link com.google.common.util.concurrent.ListenableFuture} interface. This
* class is preferable to {@link java.util.concurrent.FutureTask} for two
* reasons: It implements {@code ListenableFuture}, and it does not implement
* {@code Runnable}. (If you want a {@code Runnable} implementation of {@code
* ListenableFuture}, create a {@link com.google.common.util.concurrent.ListenableFutureTask}, or submit your
* tasks to a {@link com.google.common.util.concurrent.ListeningExecutorService}.)
* <p/>
* <p>This class implements all methods in {@code ListenableFuture}.
* Subclasses should provide a way to set the result of the computation through
* the protected methods {@link #set(Object)} and
* {@link #setException(Throwable)}. Subclasses may also override {@link
* #interruptTask()}, which will be invoked automatically if a call to {@link
* #cancel(boolean) cancel(true)} succeeds in canceling the future.
* <p/>
* <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal
* with concurrency issues and guarantee thread safety.
* <p/>
* <p>The state changing methods all return a boolean indicating success or
* failure in changing the future's state. Valid states are running,
* completed, failed, or cancelled.
* <p/>
* <p>This class uses an {@link com.google.common.util.concurrent.ExecutionList} to guarantee that all registered
* listeners will be executed, either when the future finishes or, for listeners
* that are added after the future completes, immediately.
* {@code Runnable}-{@code Executor} pairs are stored in the execution list but
* are not necessarily executed in the order in which they were added. (If a
* listener is added after the Future is complete, it will be executed
* immediately, even if earlier listeners have not been executed. Additionally,
* executors need not guarantee FIFO execution, or different listeners may run
* in different executors.)
*
* @author Sven Mawson
* @since 1.0
*/
// Same as AbstractFuture from Guava, but without the listeners
public abstract class BaseFuture<V> implements Future<V> {
/**
* Synchronization control for AbstractFutures.
*/
private final Sync<V> sync = new Sync<V>();
/*
* Improve the documentation of when InterruptedException is thrown. Our
* behavior matches the JDK's, but the JDK's documentation is misleading.
*/
/**
* {@inheritDoc}
* <p/>
* <p>The default {@link BaseFuture} implementation throws {@code
* InterruptedException} if the current thread is interrupted before or during
* the call, even if the value is already available.
*
* @throws InterruptedException if the current thread was interrupted before
* or during the call (optional but recommended).
* @throws CancellationException {@inheritDoc}
*/
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException {
return sync.get(unit.toNanos(timeout));
}
/*
* Improve the documentation of when InterruptedException is thrown. Our
* behavior matches the JDK's, but the JDK's documentation is misleading.
*/
/**
* {@inheritDoc}
* <p/>
* <p>The default {@link BaseFuture} implementation throws {@code
* InterruptedException} if the current thread is interrupted before or during
* the call, even if the value is already available.
*
* @throws InterruptedException if the current thread was interrupted before
* or during the call (optional but recommended).
* @throws CancellationException {@inheritDoc}
*/
@Override
public V get() throws InterruptedException, ExecutionException {
return sync.get();
}
@Override
public boolean isDone() {
return sync.isDone();
}
@Override
public boolean isCancelled() {
return sync.isCancelled();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!sync.cancel()) {
return false;
}
done();
if (mayInterruptIfRunning) {
interruptTask();
}
return true;
}
/**
* Subclasses can override this method to implement interruption of the
* future's computation. The method is invoked automatically by a successful
* call to {@link #cancel(boolean) cancel(true)}.
* <p/>
* <p>The default implementation does nothing.
*
* @since 10.0
*/
protected void interruptTask() {
}
/**
* Subclasses should invoke this method to set the result of the computation
* to {@code value}. This will set the state of the future to
* {@link BaseFuture.Sync#COMPLETED} and call {@link #done()} if the
* state was successfully changed.
*
* @param value the value that was the result of the task.
* @return true if the state was successfully changed.
*/
protected boolean set(@Nullable V value) {
boolean result = sync.set(value);
if (result) {
done();
}
return result;
}
/**
* Subclasses should invoke this method to set the result of the computation
* to an error, {@code throwable}. This will set the state of the future to
* {@link BaseFuture.Sync#COMPLETED} and call {@link #done()} if the
* state was successfully changed.
*
* @param throwable the exception that the task failed with.
* @return true if the state was successfully changed.
* @throws Error if the throwable was an {@link Error}.
*/
protected boolean setException(Throwable throwable) {
boolean result = sync.setException(checkNotNull(throwable));
if (result) {
done();
}
// If it's an Error, we want to make sure it reaches the top of the
// call stack, so we rethrow it.
if (throwable instanceof Error) {
throw (Error) throwable;
}
return result;
}
@Beta
protected void done() {
}
/**
* <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
* private subclass to hold the synchronizer. This synchronizer is used to
* implement the blocking and waiting calls as well as to handle state changes
* in a thread-safe manner. The current state of the future is held in the
* Sync state, and the lock is released whenever the state changes to either
* {@link #COMPLETED} or {@link #CANCELLED}.
* <p/>
* <p>To avoid races between threads doing release and acquire, we transition
* to the final state in two steps. One thread will successfully CAS from
* RUNNING to COMPLETING, that thread will then set the result of the
* computation, and only then transition to COMPLETED or CANCELLED.
* <p/>
* <p>We don't use the integer argument passed between acquire methods so we
* pass around a -1 everywhere.
*/
static final class Sync<V> extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 0L;
/* Valid states. */
static final int RUNNING = 0;
static final int COMPLETING = 1;
static final int COMPLETED = 2;
static final int CANCELLED = 4;
private V value;
private Throwable exception;
/*
* Acquisition succeeds if the future is done, otherwise it fails.
*/
@Override
protected int tryAcquireShared(int ignored) {
if (isDone()) {
return 1;
}
return -1;
}
/*
* We always allow a release to go through, this means the state has been
* successfully changed and the result is available.
*/
@Override
protected boolean tryReleaseShared(int finalState) {
setState(finalState);
return true;
}
/**
* Blocks until the task is complete or the timeout expires. Throws a
* {@link TimeoutException} if the timer expires, otherwise behaves like
* {@link #get()}.
*/
V get(long nanos) throws TimeoutException, CancellationException,
ExecutionException, InterruptedException {
// Attempt to acquire the shared lock with a timeout.
if (!tryAcquireSharedNanos(-1, nanos)) {
throw new TimeoutException("Timeout waiting for task.");
}
return getValue();
}
/**
* Blocks until {@link #complete(Object, Throwable, int)} has been
* successfully called. Throws a {@link CancellationException} if the task
* was cancelled, or a {@link ExecutionException} if the task completed with
* an error.
*/
V get() throws CancellationException, ExecutionException,
InterruptedException {
// Acquire the shared lock allowing interruption.
acquireSharedInterruptibly(-1);
return getValue();
}
/**
* Implementation of the actual value retrieval. Will return the value
* on success, an exception on failure, a cancellation on cancellation, or
* an illegal state if the synchronizer is in an invalid state.
*/
private V getValue() throws CancellationException, ExecutionException {
int state = getState();
switch (state) {
case COMPLETED:
if (exception != null) {
throw new ExecutionException(exception);
} else {
return value;
}
case CANCELLED:
throw new CancellationException("Task was cancelled.");
default:
throw new IllegalStateException(
"Error, synchronizer in invalid state: " + state);
}
}
/**
* Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
*/
boolean isDone() {
return (getState() & (COMPLETED | CANCELLED)) != 0;
}
/**
* Checks if the state is {@link #CANCELLED}.
*/
boolean isCancelled() {
return getState() == CANCELLED;
}
/**
* Transition to the COMPLETED state and set the value.
*/
boolean set(@Nullable V v) {
return complete(v, null, COMPLETED);
}
/**
* Transition to the COMPLETED state and set the exception.
*/
boolean setException(Throwable t) {
return complete(null, t, COMPLETED);
}
/**
* Transition to the CANCELLED state.
*/
boolean cancel() {
return complete(null, null, CANCELLED);
}
/**
* Implementation of completing a task. Either {@code v} or {@code t} will
* be set but not both. The {@code finalState} is the state to change to
* from {@link #RUNNING}. If the state is not in the RUNNING state we
* return {@code false}.
*
* @param v the value to set as the result of the computation.
* @param t the exception to set as the result of the computation.
* @param finalState the state to transition to.
*/
private boolean complete(@Nullable V v, Throwable t, int finalState) {
if (compareAndSetState(RUNNING, COMPLETING)) {
this.value = v;
this.exception = t;
releaseShared(finalState);
return true;
}
// The state was not RUNNING, so there are no valid transitions.
return false;
}
}
}

View File

@ -19,11 +19,11 @@
package org.elasticsearch.transport;
import com.google.common.util.concurrent.AbstractFuture;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.ElasticSearchTimeoutException;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -32,7 +32,7 @@ import java.util.concurrent.TimeoutException;
/**
*
*/
public class PlainTransportFuture<V extends Streamable> extends AbstractFuture<V> implements TransportFuture<V>, TransportResponseHandler<V> {
public class PlainTransportFuture<V extends Streamable> extends BaseFuture<V> implements TransportFuture<V>, TransportResponseHandler<V> {
private final TransportResponseHandler<V> handler;