call sync flush when shard is marked as inactive

This commit is contained in:
Britta Weber 2015-05-05 17:19:09 +02:00
parent fbc26ca49b
commit 8349a3ee1e
6 changed files with 47 additions and 3 deletions

View File

@ -999,6 +999,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public void markAsInactive() {
updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
indicesLifecycle.onShardInactive(this);
}
public final boolean isFlushOnClose() {

View File

@ -201,6 +201,15 @@ public interface IndicesLifecycle {
public void beforeIndexDeleted(IndexService indexService) {
}
/**
* Called when a shard is marked as inactive
*
* @param indexShard The shard that was marked inactive
*/
public void onShardInactive(IndexShard indexShard) {
}
}
}

View File

@ -245,4 +245,15 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
}
}
}
public void onShardInactive(IndexShard indexShard) {
for (Listener listener : listeners) {
try {
listener.onShardInactive(indexShard);
} catch (Throwable t) {
logger.warn("{} failed to invoke on shard inactive callback", t, indexShard.shardId());
throw t;
}
}
}
}

View File

@ -84,9 +84,18 @@ public class SyncedFlushService extends AbstractComponent {
preSyncTimeout = settings.getAsTime(SETTING_PRE_SYNC_TIMEOUT, TimeValue.timeValueMinutes(5));
syncTimeout = settings.getAsTime(SETTING_SYNC_TIMEOUT, TimeValue.timeValueMinutes(5));
inflightOpsTimeout = settings.getAsTime(SETTING_IN_FLIGHT_OPS_TIMEOUT, TimeValue.timeValueMinutes(5));
indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
@Override
public void onShardInactive(IndexShard indexShard) {
// we only want to call sync flush once, so only trigger it when we are on a primary
if (indexShard.routingEntry().primary()) {
attemptSyncedFlush(indexShard.shardId());
}
}
});
}
public SyncedFlushResult attemptSyncedFlush(ShardId shardId) throws ExecutionException, InterruptedException {
public SyncedFlushResult attemptSyncedFlush(ShardId shardId) {
final ClusterState state = clusterService.state();
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.index().name());
if (indexRoutingTable == null) {

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -26,6 +27,7 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.junit.Test;
@ -37,6 +39,8 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
@ -225,8 +229,18 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
indexShard.decrementOperationCounter();
indexShard.decrementOperationCounter();
assertEquals(1, indexShard.getOperationsCount());
}
@Test
public void testMarkAsInactiveTriggersSyncedFlush() {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
client().prepareIndex("test", "test").setSource("{}").get();
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
indicesService.indexService("test").shard(0).markAsInactive();
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}
public static ShardStateMetaData load(ESLogger logger, Path... shardPaths) throws IOException {

View File

@ -86,5 +86,5 @@ public class IndicesLifecycleListenerSingleNodeTests extends ElasticsearchSingle
assertAcked(client().admin().indices().prepareDelete("test").get());
assertEquals(7, counter.get());
}
}