Use general cluster state batching mechanism for shard started
This commit modifies the handling of shard started cluster state updates to use the general cluster state batching mechanism. An advantage of this approach is we now get correct per-listener notification on failures.
This commit is contained in:
parent
182c22f23f
commit
99eac0e7d7
|
@ -36,14 +36,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.*;
|
import org.elasticsearch.transport.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
|
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
|
||||||
|
|
||||||
|
@ -60,8 +58,6 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
private final AllocationService allocationService;
|
private final AllocationService allocationService;
|
||||||
private final RoutingService routingService;
|
private final RoutingService routingService;
|
||||||
|
|
||||||
private final BlockingQueue<ShardRoutingEntry> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
|
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
|
||||||
AllocationService allocationService, RoutingService routingService) {
|
AllocationService allocationService, RoutingService routingService) {
|
||||||
|
@ -185,60 +181,46 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final ShardStartedClusterStateHandler shardStartedClusterStateHandler =
|
||||||
|
new ShardStartedClusterStateHandler();
|
||||||
|
|
||||||
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
|
private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
|
||||||
logger.debug("received shard started for {}", shardRoutingEntry);
|
logger.debug("received shard started for {}", shardRoutingEntry);
|
||||||
// buffer shard started requests, and the state update tasks will simply drain it
|
|
||||||
// this is to optimize the number of "started" events we generate, and batch them
|
|
||||||
// possibly, we can do time based batching as well, but usually, we would want to
|
|
||||||
// process started events as fast as possible, to make shards available
|
|
||||||
startedShardsQueue.add(shardRoutingEntry);
|
|
||||||
|
|
||||||
clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
|
clusterService.submitStateUpdateTask(
|
||||||
new ClusterStateUpdateTask() {
|
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
|
||||||
@Override
|
shardRoutingEntry,
|
||||||
public Priority priority() {
|
ClusterStateTaskConfig.build(Priority.URGENT),
|
||||||
return Priority.URGENT;
|
shardStartedClusterStateHandler,
|
||||||
|
shardStartedClusterStateHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
|
||||||
|
@Override
|
||||||
|
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
|
||||||
|
BatchResult.Builder<ShardRoutingEntry> builder = BatchResult.builder();
|
||||||
|
ClusterState accumulator = ClusterState.builder(currentState).build();
|
||||||
|
for (ShardRoutingEntry task : tasks) {
|
||||||
|
task.processed = true;
|
||||||
|
try {
|
||||||
|
RoutingAllocation.Result result =
|
||||||
|
allocationService.applyStartedShard(currentState, task.shardRouting, true);
|
||||||
|
builder.success(task);
|
||||||
|
if (result.changed()) {
|
||||||
|
accumulator = ClusterState.builder(accumulator).routingResult(result).build();
|
||||||
}
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
builder.failure(task, t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
return builder.build(accumulator);
|
||||||
public ClusterState execute(ClusterState currentState) {
|
}
|
||||||
|
|
||||||
if (shardRoutingEntry.processed) {
|
@Override
|
||||||
return currentState;
|
public void onFailure(String source, Throwable t) {
|
||||||
}
|
logger.error("unexpected failure during [{}]", t, source);
|
||||||
|
}
|
||||||
List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<>();
|
|
||||||
startedShardsQueue.drainTo(shardRoutingEntries);
|
|
||||||
|
|
||||||
// nothing to process (a previous event has processed it already)
|
|
||||||
if (shardRoutingEntries.isEmpty()) {
|
|
||||||
return currentState;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<ShardRouting> shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size());
|
|
||||||
|
|
||||||
// mark all entries as processed
|
|
||||||
for (ShardRoutingEntry entry : shardRoutingEntries) {
|
|
||||||
entry.processed = true;
|
|
||||||
shardRoutingToBeApplied.add(entry.shardRouting);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (shardRoutingToBeApplied.isEmpty()) {
|
|
||||||
return currentState;
|
|
||||||
}
|
|
||||||
|
|
||||||
RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shardRoutingToBeApplied, true);
|
|
||||||
if (!routingResult.changed()) {
|
|
||||||
return currentState;
|
|
||||||
}
|
|
||||||
return ClusterState.builder(currentState).routingResult(routingResult).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(String source, Throwable t) {
|
|
||||||
logger.error("unexpected failure during [{}]", t, source);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
|
private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
|
||||||
|
|
|
@ -63,6 +63,22 @@ public class AllocationService extends AbstractComponent {
|
||||||
this.clusterInfoService = clusterInfoService;
|
this.clusterInfoService = clusterInfoService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Applies the started shard. Note, shards can be called several
|
||||||
|
* times within this method. If the same instance of the routing
|
||||||
|
* table is returned, then no change has been made.
|
||||||
|
* @param clusterState the cluster state
|
||||||
|
* @param startedShard the shard to start
|
||||||
|
* @param withReroute whether or not to reroute the resulting allocation
|
||||||
|
* @return the resulting routing table
|
||||||
|
*/
|
||||||
|
public RoutingAllocation.Result applyStartedShard(
|
||||||
|
ClusterState clusterState,
|
||||||
|
ShardRouting startedShard,
|
||||||
|
boolean withReroute) {
|
||||||
|
return applyStartedShards(clusterState, Collections.singletonList(startedShard), withReroute);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Applies the started shards. Note, shards can be called several times within this method.
|
* Applies the started shards. Note, shards can be called several times within this method.
|
||||||
* <p>
|
* <p>
|
||||||
|
|
Loading…
Reference in New Issue