diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 9db2dfc7ac7..547cb8607bd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -49,11 +49,15 @@ import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule; import java.io.*; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static java.util.concurrent.Executors.*; import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.metadata.MetaData.*; import static org.elasticsearch.common.unit.TimeValue.*; +import static org.elasticsearch.common.util.concurrent.EsExecutors.*; /** * @author kimchy (shay.banon) @@ -78,6 +82,8 @@ public class LocalGateway extends AbstractLifecycleComponent implements private volatile LocalGatewayStartedShards currentStartedShards; + private volatile ExecutorService executor; + private volatile boolean initialized = false; @Inject public LocalGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, @@ -105,12 +111,19 @@ public class LocalGateway extends AbstractLifecycleComponent implements } @Override protected void doStart() throws ElasticSearchException { + this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway")); lazyInitialize(); clusterService.add(this); } @Override protected void doStop() throws ElasticSearchException { clusterService.remove(this); + executor.shutdown(); + try { + executor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // ignore + } } @Override protected void doClose() throws ElasticSearchException { @@ -234,99 +247,108 @@ public class LocalGateway extends AbstractLifecycleComponent implements } if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) { - LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder(); - if (currentMetaState != null) { - builder.state(currentMetaState); - } - builder.version(event.state().version()); - builder.metaData(event.state().metaData()); - - try { - LocalGatewayMetaState stateToWrite = builder.build(); - XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); - xContentBuilder.prettyPrint(); - xContentBuilder.startObject(); - LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); - xContentBuilder.endObject(); - - File stateFile = new File(location, "metadata-" + event.state().version()); - FileOutputStream fos = new FileOutputStream(stateFile); - fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); - fos.close(); - - FileSystemUtils.syncFile(stateFile); - - currentMetaState = stateToWrite; - - // delete all the other files - File[] files = location.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("metadata-") && !name.equals("metadata-" + event.state().version()); + executor.execute(new Runnable() { + @Override public void run() { + LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder(); + if (currentMetaState != null) { + builder.state(currentMetaState); } - }); - for (File file : files) { - file.delete(); - } + builder.version(event.state().version()); + builder.metaData(event.state().metaData()); - } catch (IOException e) { - logger.warn("failed to write updated state", e); - } + try { + LocalGatewayMetaState stateToWrite = builder.build(); + XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); + xContentBuilder.prettyPrint(); + xContentBuilder.startObject(); + LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); + xContentBuilder.endObject(); + + File stateFile = new File(location, "metadata-" + event.state().version()); + FileOutputStream fos = new FileOutputStream(stateFile); + fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); + fos.close(); + + FileSystemUtils.syncFile(stateFile); + + currentMetaState = stateToWrite; + + // delete all the other files + File[] files = location.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("metadata-") && !name.equals("metadata-" + event.state().version()); + } + }); + for (File file : files) { + file.delete(); + } + + } catch (IOException e) { + logger.warn("failed to write updated state", e); + } + } + }); } if (event.state().nodes().localNode().dataNode() && event.routingTableChanged()) { - LocalGatewayStartedShards.Builder builder = LocalGatewayStartedShards.builder(); - if (currentStartedShards != null) { - builder.state(currentStartedShards); - } - builder.version(event.state().version()); - // remove from the current state all the shards that are primary and started, we won't need them anymore - for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { - for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { - if (indexShardRoutingTable.primaryShard().active()) { - builder.remove(indexShardRoutingTable.shardId()); + executor.execute(new Runnable() { + @Override public void run() { + LocalGatewayStartedShards.Builder builder = LocalGatewayStartedShards.builder(); + if (currentStartedShards != null) { + builder.state(currentStartedShards); } - } - } - // now, add all the ones that are active and on this node - RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId()); - if (routingNode != null) { - // out node is not in play yet... - for (MutableShardRouting shardRouting : routingNode) { - if (shardRouting.active()) { - builder.put(shardRouting.shardId(), event.state().version()); + builder.version(event.state().version()); + // remove from the current state all the shards that are primary and started, we won't need them anymore + for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + if (indexShardRoutingTable.primaryShard().active()) { + builder.remove(indexShardRoutingTable.shardId()); + } + } + } + // now, add all the ones that are active and on this node + RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId()); + if (routingNode != null) { + // out node is not in play yet... + for (MutableShardRouting shardRouting : routingNode) { + if (shardRouting.active()) { + builder.put(shardRouting.shardId(), event.state().version()); + } + } } - } - } - try { - LocalGatewayStartedShards stateToWrite = builder.build(); - XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); - xContentBuilder.prettyPrint(); - xContentBuilder.startObject(); - LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); - xContentBuilder.endObject(); + try { + LocalGatewayStartedShards stateToWrite = builder.build(); + XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); + xContentBuilder.prettyPrint(); + xContentBuilder.startObject(); + LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); + xContentBuilder.endObject(); - File stateFile = new File(location, "shards-" + event.state().version()); - FileOutputStream fos = new FileOutputStream(stateFile); - fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); - fos.close(); + File stateFile = new File(location, "shards-" + event.state().version()); + FileOutputStream fos = new FileOutputStream(stateFile); + fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); + fos.close(); - FileSystemUtils.syncFile(stateFile); + FileSystemUtils.syncFile(stateFile); - currentStartedShards = stateToWrite; - } catch (IOException e) { - logger.warn("failed to write updated state", e); - } + currentStartedShards = stateToWrite; + } catch (IOException e) { + logger.warn("failed to write updated state", e); + return; + } - // delete all the other files - File[] files = location.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("shards-") && !name.equals("shards-" + event.state().version()); + // delete all the other files + File[] files = location.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("shards-") && !name.equals("shards-" + event.state().version()); + } + }); + for (File file : files) { + file.delete(); + } } }); - for (File file : files) { - file.delete(); - } } }