Added new IndicesLifecycle.Listener method that allows to listen for any IndexShardState internal change.

Closes #4413
This commit is contained in:
Luca Cavanna 2013-12-11 16:05:17 +01:00
parent 4e7ce4ee02
commit 173a91bb46
5 changed files with 173 additions and 16 deletions

View File

@ -351,6 +351,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created");
indicesLifecycle.afterIndexShardCreated(indexShard);
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();

View File

@ -289,8 +289,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
synchronized (mutex) {
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
if (state == IndexShardState.POST_RECOVERY) {
logger.debug("state: [{}]->[{}], reason [global state is [{}]]", state, IndexShardState.STARTED, newRouting.state());
state = IndexShardState.STARTED;
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
movedToStarted = true;
} else {
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
@ -314,7 +313,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
public IndexShardState recovering(String reason) throws IndexShardStartedException,
IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
synchronized (mutex) {
IndexShardState returnValue = state;
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
@ -330,9 +328,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (state == IndexShardState.POST_RECOVERY) {
throw new IndexShardRecoveringException(shardId);
}
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RECOVERING, reason);
state = IndexShardState.RECOVERING;
return returnValue;
return changeState(IndexShardState.RECOVERING, reason);
}
}
@ -341,8 +337,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
}
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.RELOCATED, reason);
state = IndexShardState.RELOCATED;
changeState(IndexShardState.RELOCATED, reason);
}
return this;
}
@ -352,6 +347,20 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return state;
}
/**
* Changes the state of the current shard
* @param newState the new shard state
* @param reason the reason for the state change
* @return the previous shard state
*/
private IndexShardState changeState(IndexShardState newState, String reason) {
logger.debug("state: [{}]->[{}], reason [{}]", state, newState, reason);
IndexShardState previousState = state;
state = newState;
this.indicesLifecycle.indexShardStateChanged(this, previousState, reason);
return previousState;
}
@Override
public Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException {
long startTime = System.nanoTime();
@ -653,10 +662,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
mergeScheduleFuture = null;
}
}
if (logger.isDebugEnabled()) {
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason);
}
state = IndexShardState.CLOSED;
changeState(IndexShardState.CLOSED, reason);
}
}
@ -681,8 +687,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
engine.start();
startScheduledTasksIfNeeded();
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.POST_RECOVERY, reason);
state = IndexShardState.POST_RECOVERY;
changeState(IndexShardState.POST_RECOVERY, reason);
}
indicesLifecycle.afterIndexShardPostRecovery(this);
return this;
@ -725,8 +730,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
translog.clearUnreferenced();
engine.refresh(new Engine.Refresh("recovery_finalization").force(true));
synchronized (mutex) {
logger.debug("state: [{}]->[{}], reason [post recovery]", state, IndexShardState.POST_RECOVERY);
state = IndexShardState.POST_RECOVERY;
changeState(IndexShardState.POST_RECOVERY, "post recovery");
}
indicesLifecycle.afterIndexShardPostRecovery(this);
startScheduledTasksIfNeeded();

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
@ -132,6 +133,19 @@ public interface IndicesLifecycle {
public void afterIndexShardClosed(ShardId shardId) {
}
/**
* Called after a shard's {@link org.elasticsearch.index.shard.IndexShardState} changes.
* The order of concurrent events is preserved. The execution must be lightweight.
*
* @param indexShard the shard the new state was applied to
* @param previousState the previous index shard state if there was one, null otherwise
* @param currentState the new shard state
* @param reason the reason for the state change if there is one, null otherwise
*/
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
}
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
@ -160,4 +161,14 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
}
}
}
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, @Nullable String reason) {
for (Listener listener : listeners) {
try {
listener.indexShardStateChanged(indexShard, previousState, indexShard.state(), reason);
} catch (Throwable t) {
logger.warn("{} failed to invoke index shard state changed callback", t, indexShard.shardId());
}
}
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.elasticsearch.indices;
import com.google.common.collect.Maps;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION;
import static org.elasticsearch.common.settings.ImmutableSettings.builder;
import static org.elasticsearch.index.shard.IndexShardState.*;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
@ClusterScope(scope = Scope.TEST, numNodes = 0)
public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest {
@Test
public void testIndexStateShardChanged() throws Throwable {
//start with a single node
String node1 = cluster().startNode();
IndexShardStateChangeListener stateChangeListenerNode1 = new IndexShardStateChangeListener();
//add a listener that keeps track of the shard state changes
cluster().getInstance(IndicesLifecycle.class, node1).addListener(stateChangeListenerNode1);
//create an index
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 6, SETTING_NUMBER_OF_REPLICAS, 0));
ensureGreen();
//new shards got started
assertShardStatesMatch(stateChangeListenerNode1, 6, CREATED, RECOVERING, POST_RECOVERY, STARTED);
//add a node: 3 out of the 6 shards will be relocated to it
//disable allocation before starting a new node, as we need to register the listener first
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(builder().put(CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, true)));
String node2 = cluster().startNode();
IndexShardStateChangeListener stateChangeListenerNode2 = new IndexShardStateChangeListener();
//add a listener that keeps track of the shard state changes
cluster().getInstance(IndicesLifecycle.class, node2).addListener(stateChangeListenerNode2);
//re-enable allocation
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(builder().put(CLUSTER_ROUTING_ALLOCATION_DISABLE_ALLOCATION, false)));
ensureGreen();
//the 3 relocated shards get closed on the first node
assertShardStatesMatch(stateChangeListenerNode1, 3, CLOSED);
//the 3 relocated shards get created on the second node
assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED);
//increase replicas from 0 to 1
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(builder().put(SETTING_NUMBER_OF_REPLICAS, 1)));
ensureGreen();
//3 replicas are allocated to the first node
assertShardStatesMatch(stateChangeListenerNode1, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED);
//3 replicas are allocated to the second node
assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED);
//close the index
assertAcked(client().admin().indices().prepareClose("test"));
assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED);
assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED);
}
private static void assertShardStatesMatch(IndexShardStateChangeListener stateChangeListener, int numShards, IndexShardState... shardStates) {
assertThat(stateChangeListener.shardStates.size(), equalTo(numShards));
for (List<IndexShardState> indexShardStates : stateChangeListener.shardStates.values()) {
assertThat(indexShardStates, notNullValue());
assertThat(indexShardStates.size(), equalTo(shardStates.length));
for (int i = 0; i < shardStates.length; i++) {
assertThat(indexShardStates.get(i), equalTo(shardStates[i]));
}
}
stateChangeListener.shardStates.clear();
}
private static class IndexShardStateChangeListener extends IndicesLifecycle.Listener {
//we keep track of all the states (ordered) a shard goes through
final ConcurrentMap<ShardId, List<IndexShardState>> shardStates = Maps.newConcurrentMap();
@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState newState, @Nullable String reason) {
List<IndexShardState> shardStates = this.shardStates.putIfAbsent(indexShard.shardId(),
new CopyOnWriteArrayList<IndexShardState>(new IndexShardState[]{newState}));
if (shardStates != null) {
shardStates.add(newState);
}
}
}
}