[INDEX] Add before/after indexDeleted callbacks to IndicesLifecycle

In order to implement #8551 correctly without causing problems of relocating
shards we need to be informed if an index is actually deleted. This commit adds
more callbacks to the listener and makes deleteIndex a dedicated method on IndicesService
This commit is contained in:
Simon Willnauer 2014-11-20 13:55:47 +01:00
parent 26b4ebcd00
commit d5d5dece56
7 changed files with 250 additions and 37 deletions

View File

@ -146,6 +146,28 @@ public interface IndicesLifecycle {
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
}
/**
* Called after the index has been deleted.
* This listener method is invoked after {@link #afterIndexClosed(org.elasticsearch.index.Index)}
* when an index is deleted
*
* @param index The index
*/
public void afterIndexDeleted(Index index) {
}
/**
* Called before the index gets deleted.
* This listener method is invoked after
* {@link #beforeIndexClosed(org.elasticsearch.index.service.IndexService)} when an index is deleted
*
* @param indexService The index service
*/
public void beforeIndexDeleted(IndexService indexService) {
}
}
}

View File

@ -78,9 +78,24 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone
IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticsearchException;
/**
* Removes the given index from this service and releases all associated resources. Persistent parts of the index
* like the shards files, state and transaction logs are kept around in the case of a disaster recovery.
* @param index the index to remove
* @param reason the high level reason causing this removal
*/
void removeIndex(String index, String reason) throws ElasticsearchException;
void removeIndex(String index, String reason, @Nullable IndexCloseListener listener) throws ElasticsearchException;
/**
* Deletes the given index. Persistent parts of the index
* like the shards files, state and transaction logs are removed once all resources are released.
*
* Equivalent to {@link #removeIndex(String, String)} but fires
* different lifecycle events to ensure pending resources of this index are immediately removed.
* @param index the index to delete
* @param reason the high level reason causing this delete
*/
void deleteIndex(String index, String reason) throws ElasticsearchException;
/**
* A listener interface that can be used to get notification once a shard or all shards

View File

@ -132,6 +132,26 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
}
}
public void beforeIndexDeleted(IndexService indexService) {
for (Listener listener : listeners) {
try {
listener.beforeIndexDeleted(indexService);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke before index deleted callback", t, indexService.index().name());
}
}
}
public void afterIndexDeleted(Index index) {
for (Listener listener : listeners) {
try {
listener.afterIndexDeleted(index);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke after index deleted callback", t, index.name());
}
}
}
public void afterIndexClosed(Index index) {
for (Listener listener : listeners) {
try {

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesServiceModule;
@ -71,6 +72,7 @@ import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.plugins.IndexPluginsModule;
import org.elasticsearch.plugins.PluginsService;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -100,6 +102,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
private final PluginsService pluginsService;
private final NodeEnvironment nodeEnv;
private final Map<String, Injector> indicesInjectors = new HashMap<>();
private volatile ImmutableMap<String, IndexService> indices = ImmutableMap.of();
@ -107,7 +111,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
private final OldShardsStats oldShardsStats = new OldShardsStats();
@Inject
public InternalIndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) {
public InternalIndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector, NodeEnvironment nodeEnv) {
super(settings);
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indicesAnalysisService = indicesAnalysisService;
@ -117,6 +121,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
this.pluginsService = injector.getInstance(PluginsService.class);
this.indicesLifecycle.addListener(oldShardsStats);
this.nodeEnv = nodeEnv;
}
@Override
@ -135,7 +140,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
@Override
public void run() {
try {
removeIndex(index, "shutdown", new IndexCloseListener() {
removeIndex(index, "shutdown", false, new IndexCloseListener() {
@Override
public void onAllShardsClosed(Index index, List<Throwable> failures) {
latch.countDown();
@ -328,11 +333,44 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
@Override
public void removeIndex(String index, String reason) throws ElasticsearchException {
removeIndex(index, reason, null);
removeIndex(index, reason, false, null);
}
@Override
public void removeIndex(String index, String reason, @Nullable IndexCloseListener listener) throws ElasticsearchException {
public void deleteIndex(String index, String reason) throws ElasticsearchException {
removeIndex(index, reason, true, new IndicesService.IndexCloseListener() {
@Override
public void onAllShardsClosed(Index index, List<Throwable> failures) {
try {
nodeEnv.deleteIndexDirectorySafe(index);
logger.debug("deleted index [{}] from filesystem - failures {}", index, failures);
} catch (Exception e) {
for (Throwable t : failures) {
e.addSuppressed(t);
}
logger.debug("failed to deleted index [{}] from filesystem", e, index);
// ignore - still some shards locked here
}
}
@Override
public void onShardClosed(ShardId shardId) {
try {
nodeEnv.deleteShardDirectorySafe(shardId);
logger.debug("deleted shard [{}] from filesystem", shardId);
} catch (IOException e) {
logger.warn("Can't delete shard {} ", e, shardId);
}
}
@Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {
}
});
}
private void removeIndex(String index, String reason, boolean delete, @Nullable IndexCloseListener listener) throws ElasticsearchException {
final IndexService indexService;
final Injector indexInjector;
synchronized (this) {
@ -346,7 +384,11 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
indexService = tmpMap.remove(index);
indices = ImmutableMap.copyOf(tmpMap);
}
indicesLifecycle.beforeIndexClosed(indexService);
if (delete) {
indicesLifecycle.beforeIndexDeleted(indexService);
}
for (Class<? extends CloseableIndexComponent> closeable : pluginsService.indexServices()) {
indexInjector.getInstance(closeable).close();
@ -378,6 +420,9 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
logger.debug("[{}] closed... (reason [{}])", index, reason);
indicesLifecycle.afterIndexClosed(indexService.index());
if (delete) {
indicesLifecycle.afterIndexDeleted(indexService.index());
}
}

View File

@ -257,33 +257,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
}
removeIndex(index, "index no longer part of the metadata", new IndicesService.IndexCloseListener() {
@Override
public void onAllShardsClosed(Index index, List<Throwable> failures) {
try {
nodeEnvironment.deleteIndexDirectorySafe(index);
logger.debug("deleted index [{}] from filesystem", index);
} catch (Exception e) {
logger.debug("failed to deleted index [{}] from filesystem", e, index);
// ignore - still some shards locked here
}
}
@Override
public void onShardClosed(ShardId shardId) {
try {
nodeEnvironment.deleteShardDirectorySafe(shardId);
logger.debug("deleted shard [{}] from filesystem", shardId);
} catch (IOException e) {
logger.warn("Can't delete shard {} ", e, shardId);
}
}
@Override
public void onShardCloseFailed(ShardId shardId, Throwable t) {
}
});
deleteIndex(index, "index no longer part of the metadata");
}
}
}
@ -869,15 +843,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
private void removeIndex(String index, String reason) {
removeIndex(index, reason, null);
}
private void removeIndex(String index, String reason, @Nullable IndicesService.IndexCloseListener listener) {
try {
indicesService.removeIndex(index, reason, listener);
indicesService.removeIndex(index, reason);
} catch (Throwable e) {
logger.warn("failed to clean index ({})", e, reason);
}
clearSeenMappings(index);
}
private void clearSeenMappings(String index) {
// clear seen mappings as well
for (Tuple<String, String> tuple : seenMappings.keySet()) {
if (tuple.v1().equals(index)) {
@ -886,6 +861,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
private void deleteIndex(String index, String reason) {
try {
indicesService.deleteIndex(index, reason);
} catch (Throwable e) {
logger.warn("failed to delete index ({})", e, reason);
}
// clear seen mappings as well
clearSeenMappings(index);
}
private class FailedEngineHandler implements Engine.FailedEngineListener {
@Override
public void onFailedEngine(final ShardId shardId, final String reason, final @Nullable Throwable failure) {

View File

@ -0,0 +1,95 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION;
import static org.elasticsearch.common.settings.ImmutableSettings.builder;
import static org.elasticsearch.index.shard.IndexShardState.*;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
public class IndicesLifecycleListenerSingleNodeTests extends ElasticsearchSingleNodeTest {
@Override
protected boolean resetNodeAfterTest() {
return true;
}
@Test
public void testCloseDeleteCallback() throws Throwable {
final AtomicInteger counter = new AtomicInteger(1);
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
ensureGreen();
getInstanceFromNode(IndicesLifecycle.class).addListener(new IndicesLifecycle.Listener() {
@Override
public void afterIndexClosed(Index index) {
assertEquals(counter.get(), 3);
counter.incrementAndGet();
}
@Override
public void beforeIndexClosed(IndexService indexService) {
assertEquals(counter.get(), 1);
counter.incrementAndGet();
}
@Override
public void afterIndexDeleted(Index index) {
assertEquals(counter.get(), 4);
counter.incrementAndGet();
}
@Override
public void beforeIndexDeleted(IndexService indexService) {
assertEquals(counter.get(), 2);
counter.incrementAndGet();
}
});
assertAcked(client().admin().indices().prepareDelete("test").get());
assertEquals(5, counter.get());
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -205,4 +206,34 @@ public abstract class ElasticsearchSingleNodeTest extends ElasticsearchTestCase
return new TestSearchContext(threadPool, pageCacheRecycler, bigArrays, indexService, indexService.cache().filter(), indexService.fieldData());
}
/**
* Ensures the cluster has a green state via the cluster health API. This method will also wait for relocations.
* It is useful to ensure that all action on the cluster have finished and all shards that were currently relocating
* are now allocated and started.
*/
public ClusterHealthStatus ensureGreen(String... indices) {
return ensureGreen(TimeValue.timeValueSeconds(30), indices);
}
/**
* Ensures the cluster has a green state via the cluster health API. This method will also wait for relocations.
* It is useful to ensure that all action on the cluster have finished and all shards that were currently relocating
* are now allocated and started.
*
* @param timeout time out value to set on {@link org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest}
*/
public ClusterHealthStatus ensureGreen(TimeValue timeout, String... indices) {
ClusterHealthResponse actionGet = client().admin().cluster()
.health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
if (actionGet.isTimedOut()) {
logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
}
assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN));
logger.debug("indices {} are green", indices.length == 0 ? "[_all]" : indices);
return actionGet.getStatus();
}
}