Add `beforeIndexAddedToCluster` callback

This callback is executed only once, on the master node during an
index's creation. An exception thrown during this listener will cancel
the index creation.

This also adds checks in `IndicesClusterStateService` for the
indexService being null as well as if the `indicesService.createIndex`
throws an exception on data nodes after an index has already been
created.
This commit is contained in:
Lee Hinman 2015-01-30 12:57:53 -07:00
parent b2010f788d
commit 9fe84062a1
6 changed files with 96 additions and 6 deletions

View File

@ -65,10 +65,7 @@ import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexCreationException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.indices.*;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
@ -453,6 +450,9 @@ public class MetaDataCreateIndexService extends AbstractComponent {
throw e;
}
indexService.indicesLifecycle().beforeIndexAddedToCluster(new Index(request.index()),
indexMetaData.settings());
MetaData newMetaData = MetaData.builder(currentState.metaData())
.put(indexMetaData, false)
.build();

View File

@ -162,6 +162,10 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
return shards.size();
}
public InternalIndicesLifecycle indicesLifecycle() {
return this.indicesLifecycle;
}
@Override
public Iterator<IndexShard> iterator() {
return Iterators.transform(shards.values().iterator(), new Function<Tuple<IndexShard, Injector>, IndexShard>() {

View File

@ -62,7 +62,15 @@ public interface IndicesLifecycle {
}
/**
* Called before the index gets created.
* Called on the Master node only before the index is created
*/
public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) {
}
/**
* Called before the index gets created. Note that this is also called
* when the index is created on data nodes
*/
public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {

View File

@ -65,6 +65,17 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
}
}
public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) {
for (Listener listener : listeners) {
try {
listener.beforeIndexAddedToCluster(index, indexSettings);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke before index added to cluster callback", t, index.name());
throw t;
}
}
}
public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
for (Listener listener : listeners) {
try {

View File

@ -220,6 +220,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// handle closed indices, since they are not allocated on a node once they are closed
// so applyDeletedIndices might not take them into account
for (IndexService indexService : indicesService) {
if (indexService == null) {
// already deleted on us, ignore it
continue;
}
String index = indexService.index().getName();
IndexMetaData indexMetaData = event.state().metaData().index(index);
if (indexMetaData != null && indexMetaData.state() == IndexMetaData.State.CLOSE) {
@ -247,6 +251,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private void applyDeletedIndices(final ClusterChangedEvent event) {
for (IndexService indexService : indicesService) {
if (indexService == null) {
// got deleted already on us, skip
continue;
}
final String index = indexService.index().name();
if (!event.state().metaData().hasIndex(index)) {
if (logger.isDebugEnabled()) {
@ -264,6 +272,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
IntOpenHashSet newShardIds = new IntOpenHashSet();
for (IndexService indexService : indicesService) {
if (indexService == null) {
// already deleted on us
continue;
}
String index = indexService.index().name();
IndexMetaData indexMetaData = event.state().metaData().index(index);
if (indexMetaData == null) {
@ -308,7 +320,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (logger.isDebugEnabled()) {
logger.debug("[{}] creating index", indexMetaData.index());
}
indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), event.state().nodes().localNode().id());
try {
indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), event.state().nodes().localNode().id());
} catch (Exception e) {
logger.warn("failed to create index [{}]", e, indexMetaData.index());
}
}
}
}
@ -328,6 +344,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
String index = indexMetaData.index();
IndexService indexService = indicesService.indexServiceSafe(index);
if (indexService == null) {
// already deleted on us, ignore it
continue;
}
IndexSettingsService indexSettingsService = indexService.injector().getInstance(IndexSettingsService.class);
indexSettingsService.refreshSettings(indexMetaData.settings());
}

View File

@ -22,6 +22,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -39,6 +40,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@ -49,10 +51,55 @@ import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest {
@Test
public void testBeforeIndexAddedToCluster() throws Exception {
String node1 = internalCluster().startNode();
String node2 = internalCluster().startNode();
String node3 = internalCluster().startNode();
final AtomicInteger beforeAddedCount = new AtomicInteger(0);
final AtomicInteger allCreatedCount = new AtomicInteger(0);
IndicesLifecycle.Listener listener = new IndicesLifecycle.Listener() {
@Override
public void beforeIndexAddedToCluster(Index index, @IndexSettings Settings indexSettings) {
beforeAddedCount.incrementAndGet();
if (indexSettings.getAsBoolean("index.fail", false)) {
throw new ElasticsearchException("failing on purpose");
}
}
@Override
public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
allCreatedCount.incrementAndGet();
}
};
internalCluster().getInstance(IndicesLifecycle.class, node1).addListener(listener);
internalCluster().getInstance(IndicesLifecycle.class, node2).addListener(listener);
internalCluster().getInstance(IndicesLifecycle.class, node3).addListener(listener);
client().admin().indices().prepareCreate("test")
.setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get();
ensureGreen("test");
assertThat("beforeIndexAddedToCluster called only once", beforeAddedCount.get(), equalTo(1));
assertThat("beforeIndexCreated called on each data node", allCreatedCount.get(), greaterThanOrEqualTo(3));
try {
client().admin().indices().prepareCreate("failed").setSettings("index.fail", true).get();
fail("should have thrown an exception during creation");
} catch (Exception e) {
assertTrue(e.getMessage().contains("failing on purpose"));
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
assertFalse(resp.getState().routingTable().indicesRouting().keySet().contains("failed"));
}
}
@Test
public void testIndexStateShardChanged() throws Throwable {