Inline Shard(.*)TransportHandler#handleShard\1OnMaster methods

This commit is contained in:
Jason Tedor 2016-01-04 11:07:42 -05:00
parent 754bd66b63
commit fdb0c909ec
1 changed files with 11 additions and 20 deletions

View File

@ -124,7 +124,13 @@ public class ShardStateAction extends AbstractComponent {
@Override @Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardFailureOnMaster(request, new ClusterStateTaskListener() { logger.warn("{} received shard failed for {}", request.failure, request.shardRouting.shardId(), request);
clusterService.submitStateUpdateTask(
"shard-failed (" + request.shardRouting + "), message [" + request.message + "]",
request,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler,
new ClusterStateTaskListener() {
@Override @Override
public void onFailure(String source, Throwable t) { public void onFailure(String source, Throwable t) {
logger.error("{} unexpected failure while failing shard [{}]", t, request.shardRouting.shardId(), request.shardRouting); logger.error("{} unexpected failure while failing shard [{}]", t, request.shardRouting.shardId(), request.shardRouting);
@ -156,16 +162,6 @@ public class ShardStateAction extends AbstractComponent {
} }
); );
} }
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry, ClusterStateTaskListener listener) {
logger.warn("{} received shard failed for [{}]", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
clusterService.submitStateUpdateTask(
"shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]",
shardRoutingEntry,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateHandler,
listener);
}
} }
class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry> { class ShardFailedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry> {
@ -230,19 +226,14 @@ public class ShardStateAction extends AbstractComponent {
@Override @Override
public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception {
handleShardStartedOnMaster(request); logger.debug("{} received shard started for [{}]", request.shardRouting.shardId(), request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
private void handleShardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("{} received shard started for [{}]", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
clusterService.submitStateUpdateTask( clusterService.submitStateUpdateTask(
"shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", "shard-started (" + request.shardRouting + "), reason [" + request.message + "]",
shardRoutingEntry, request,
ClusterStateTaskConfig.build(Priority.URGENT), ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateHandler, shardStartedClusterStateHandler,
shardStartedClusterStateHandler); shardStartedClusterStateHandler);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} }
} }