improve the order of shutdown of top level components in node, close indices first and applying cluster changes, also, improve atomicity of closing of indices and shards

This commit is contained in:
kimchy 2011-02-01 23:10:15 +02:00
parent eb1db1140b
commit d2a3f5142f
5 changed files with 35 additions and 8 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.service;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableMap;
@ -114,6 +115,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private volatile ImmutableMap<Integer, IndexShard> shards = ImmutableMap.of();
private volatile boolean closed = false;
private final CleanCacheOnIndicesLifecycleListener cleanCacheOnIndicesLifecycleListener = new CleanCacheOnIndicesLifecycleListener();
@Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool,
@ -209,6 +212,9 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
}
public void close(final boolean delete, final String reason) {
synchronized (this) {
closed = true;
}
try {
Set<Integer> shardIds = shardIds();
final CountDownLatch latch = new CountDownLatch(shardIds.size());
@ -248,6 +254,9 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
}
@Override public synchronized IndexShard createShard(int sShardId) throws ElasticSearchException {
if (closed) {
throw new ElasticSearchIllegalStateException("Can't create shard [" + index.name() + "][" + sShardId + "], closed");
}
ShardId shardId = new ShardId(index, sShardId);
if (shardsInjectors.containsKey(shardId.id())) {
throw new IndexShardAlreadyExistsException(shardId + " already exists");

View File

@ -23,6 +23,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.FieldCache;
import org.apache.lucene.search.IndexReaderPurgedListener;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.UnmodifiableIterator;
@ -209,6 +210,9 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
}
public synchronized IndexService createIndex(String sIndexName, Settings settings, String localNodeId) throws ElasticSearchException {
if (!lifecycle.started()) {
throw new ElasticSearchIllegalStateException("Can't create an index [" + sIndexName + "] is closed");
}
Index index = new Index(sIndexName);
if (indicesInjectors.containsKey(index.name())) {
throw new IndexAlreadyExistsException(index);

View File

@ -124,6 +124,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (!indicesService.changesAllowed())
return;
if (!lifecycle.started()) {
return;
}
synchronized (mutex) {
applyNewIndices(event);
applyMappings(event);

View File

@ -204,6 +204,16 @@ public final class InternalNode implements Node {
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).stop();
}
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
// we close indices first, so operations won't be allowed on it
injector.getInstance(IndicesService.class).stop();
// sleep a bit to let operations finish with indices service
// try {
// Thread.sleep(500);
// } catch (InterruptedException e) {
// // ignore
// }
injector.getInstance(RoutingService.class).stop();
injector.getInstance(ClusterService.class).stop();
injector.getInstance(DiscoveryService.class).stop();
@ -211,8 +221,6 @@ public final class InternalNode implements Node {
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(RiversManager.class).stop();
injector.getInstance(IndicesClusterStateService.class).stop();
injector.getInstance(IndicesService.class).stop();
injector.getInstance(RestController.class).stop();
injector.getInstance(TransportService.class).stop();
injector.getInstance(JmxService.class).close();
@ -244,6 +252,10 @@ public final class InternalNode implements Node {
}
stopWatch.stop().start("client");
injector.getInstance(Client.class).close();
stopWatch.stop().start("indices_cluster");
injector.getInstance(IndicesClusterStateService.class).close();
stopWatch.stop().start("indices");
injector.getInstance(IndicesService.class).close();
stopWatch.stop().start("routing");
injector.getInstance(RoutingService.class).close();
stopWatch.stop().start("cluster");
@ -258,10 +270,6 @@ public final class InternalNode implements Node {
injector.getInstance(SearchService.class).close();
stopWatch.stop().start("indexers");
injector.getInstance(RiversManager.class).close();
stopWatch.stop().start("indices_cluster");
injector.getInstance(IndicesClusterStateService.class).close();
stopWatch.stop().start("indices");
injector.getInstance(IndicesService.class).close();
stopWatch.stop().start("rest");
injector.getInstance(RestController.class).close();
stopWatch.stop().start("transport");

View File

@ -202,7 +202,7 @@ public class RollingRestartStressTest {
// check the count
for (int i = 0; i < (nodes.length * 5); i++) {
CountResponse count = client.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet();
logger.info("indexed [{}], count [{}]", count.count(), indexCounter.get());
logger.info("indexed [{}], count [{}], [{}]", count.count(), indexCounter.get(), count.count() == indexCounter.get() ? "OK" : "FAIL");
if (count.count() != indexCounter.get()) {
logger.warn("count does not match!");
}
@ -254,7 +254,9 @@ public class RollingRestartStressTest {
json.endObject();
client.client().prepareIndex("test", "type1", Long.toString(idCounter.incrementAndGet()))
String id = Long.toString(idCounter.incrementAndGet());
logger.info("indexing " + id);
client.client().prepareIndex("test", "type1", id)
.setCreate(true)
.setSource(json)
.execute().actionGet();