move persistenting state and sharsd state in local gateway to another thread

This commit is contained in:
kimchy 2010-09-19 17:54:14 +02:00
parent f82ceb1e1e
commit 589dbce89b
1 changed files with 101 additions and 79 deletions

View File

@ -49,11 +49,15 @@ import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule;
import java.io.*; import java.io.*;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.Executors.*;
import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.MetaData.*; import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.common.unit.TimeValue.*; import static org.elasticsearch.common.unit.TimeValue.*;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -78,6 +82,8 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
private volatile LocalGatewayStartedShards currentStartedShards; private volatile LocalGatewayStartedShards currentStartedShards;
private volatile ExecutorService executor;
private volatile boolean initialized = false; private volatile boolean initialized = false;
@Inject public LocalGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService, @Inject public LocalGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
@ -105,12 +111,19 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
} }
@Override protected void doStart() throws ElasticSearchException { @Override protected void doStart() throws ElasticSearchException {
this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway"));
lazyInitialize(); lazyInitialize();
clusterService.add(this); clusterService.add(this);
} }
@Override protected void doStop() throws ElasticSearchException { @Override protected void doStop() throws ElasticSearchException {
clusterService.remove(this); clusterService.remove(this);
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
} }
@Override protected void doClose() throws ElasticSearchException { @Override protected void doClose() throws ElasticSearchException {
@ -234,99 +247,108 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
} }
if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) { if (event.state().nodes().localNode().masterNode() && event.metaDataChanged()) {
LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder(); executor.execute(new Runnable() {
if (currentMetaState != null) { @Override public void run() {
builder.state(currentMetaState); LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder();
} if (currentMetaState != null) {
builder.version(event.state().version()); builder.state(currentMetaState);
builder.metaData(event.state().metaData());
try {
LocalGatewayMetaState stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON);
xContentBuilder.prettyPrint();
xContentBuilder.startObject();
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
File stateFile = new File(location, "metadata-" + event.state().version());
FileOutputStream fos = new FileOutputStream(stateFile);
fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength());
fos.close();
FileSystemUtils.syncFile(stateFile);
currentMetaState = stateToWrite;
// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("metadata-") && !name.equals("metadata-" + event.state().version());
} }
}); builder.version(event.state().version());
for (File file : files) { builder.metaData(event.state().metaData());
file.delete();
}
} catch (IOException e) { try {
logger.warn("failed to write updated state", e); LocalGatewayMetaState stateToWrite = builder.build();
} XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON);
xContentBuilder.prettyPrint();
xContentBuilder.startObject();
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
File stateFile = new File(location, "metadata-" + event.state().version());
FileOutputStream fos = new FileOutputStream(stateFile);
fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength());
fos.close();
FileSystemUtils.syncFile(stateFile);
currentMetaState = stateToWrite;
// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("metadata-") && !name.equals("metadata-" + event.state().version());
}
});
for (File file : files) {
file.delete();
}
} catch (IOException e) {
logger.warn("failed to write updated state", e);
}
}
});
} }
if (event.state().nodes().localNode().dataNode() && event.routingTableChanged()) { if (event.state().nodes().localNode().dataNode() && event.routingTableChanged()) {
LocalGatewayStartedShards.Builder builder = LocalGatewayStartedShards.builder(); executor.execute(new Runnable() {
if (currentStartedShards != null) { @Override public void run() {
builder.state(currentStartedShards); LocalGatewayStartedShards.Builder builder = LocalGatewayStartedShards.builder();
} if (currentStartedShards != null) {
builder.version(event.state().version()); builder.state(currentStartedShards);
// remove from the current state all the shards that are primary and started, we won't need them anymore
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (indexShardRoutingTable.primaryShard().active()) {
builder.remove(indexShardRoutingTable.shardId());
} }
} builder.version(event.state().version());
} // remove from the current state all the shards that are primary and started, we won't need them anymore
// now, add all the ones that are active and on this node for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId()); for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (routingNode != null) { if (indexShardRoutingTable.primaryShard().active()) {
// out node is not in play yet... builder.remove(indexShardRoutingTable.shardId());
for (MutableShardRouting shardRouting : routingNode) { }
if (shardRouting.active()) { }
builder.put(shardRouting.shardId(), event.state().version()); }
// now, add all the ones that are active and on this node
RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId());
if (routingNode != null) {
// out node is not in play yet...
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.active()) {
builder.put(shardRouting.shardId(), event.state().version());
}
}
} }
}
}
try { try {
LocalGatewayStartedShards stateToWrite = builder.build(); LocalGatewayStartedShards stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON); XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON);
xContentBuilder.prettyPrint(); xContentBuilder.prettyPrint();
xContentBuilder.startObject(); xContentBuilder.startObject();
LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject(); xContentBuilder.endObject();
File stateFile = new File(location, "shards-" + event.state().version()); File stateFile = new File(location, "shards-" + event.state().version());
FileOutputStream fos = new FileOutputStream(stateFile); FileOutputStream fos = new FileOutputStream(stateFile);
fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength()); fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength());
fos.close(); fos.close();
FileSystemUtils.syncFile(stateFile); FileSystemUtils.syncFile(stateFile);
currentStartedShards = stateToWrite; currentStartedShards = stateToWrite;
} catch (IOException e) { } catch (IOException e) {
logger.warn("failed to write updated state", e); logger.warn("failed to write updated state", e);
} return;
}
// delete all the other files // delete all the other files
File[] files = location.listFiles(new FilenameFilter() { File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) { @Override public boolean accept(File dir, String name) {
return name.startsWith("shards-") && !name.equals("shards-" + event.state().version()); return name.startsWith("shards-") && !name.equals("shards-" + event.state().version());
}
});
for (File file : files) {
file.delete();
}
} }
}); });
for (File file : files) {
file.delete();
}
} }
} }