Modify cluster state callback in recovery land

We use a callback in recovery land during primary relocation to ensure
the relocation target is on at least the same version as the relocation
source. This callback is typed as a Callback<Long> which is an
unnecessary custom type (we can use Consumer<T> or the appropriate
primitive callbacks). Here, we can use LongConsumer.

Relates #25081
This commit is contained in:
Jason Tedor 2017-06-06 16:29:10 -04:00 committed by GitHub
parent 5a0b159cb7
commit 1a681a928d
2 changed files with 7 additions and 8 deletions

View File

@ -25,8 +25,6 @@ import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShard;
@ -39,7 +37,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.LongConsumer;
/**
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
@ -54,9 +52,9 @@ public class RecoveriesCollection {
private final Logger logger;
private final ThreadPool threadPool;
private final Callback<Long> ensureClusterStateVersionCallback;
private final LongConsumer ensureClusterStateVersionCallback;
public RecoveriesCollection(Logger logger, ThreadPool threadPool, Callback<Long> ensureClusterStateVersionCallback) {
public RecoveriesCollection(Logger logger, ThreadPool threadPool, LongConsumer ensureClusterStateVersionCallback) {
this.logger = logger;
this.threadPool = threadPool;
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;

View File

@ -59,6 +59,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
/**
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
@ -79,7 +80,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final String tempFilePrefix;
private final Store store;
private final PeerRecoveryTargetService.RecoveryListener listener;
private final Callback<Long> ensureClusterStateVersionCallback;
private final LongConsumer ensureClusterStateVersionCallback;
private final AtomicBoolean finished = new AtomicBoolean();
@ -107,7 +108,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
public RecoveryTarget(final IndexShard indexShard,
final DiscoveryNode sourceNode,
final PeerRecoveryTargetService.RecoveryListener listener,
final Callback<Long> ensureClusterStateVersionCallback) {
final LongConsumer ensureClusterStateVersionCallback) {
super("recovery_status");
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
@ -371,7 +372,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
@Override
public void ensureClusterStateVersion(long clusterStateVersion) {
ensureClusterStateVersionCallback.handle(clusterStateVersion);
ensureClusterStateVersionCallback.accept(clusterStateVersion);
}
@Override