Adapt SyncedFlushService (#37691)

This commit is contained in:
Tanguy Leroux 2019-01-23 11:08:54 +01:00 committed by GitHub
parent 701d89caa2
commit 6130d15172
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 16 deletions

View File

@ -32,7 +32,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
@ -290,16 +289,14 @@ public class SyncedFlushService implements IndexEventListener {
listener.onResponse(new ShardsSyncedFlushResult(shardId, existingSyncId, totalShards, results)); listener.onResponse(new ShardsSyncedFlushResult(shardId, existingSyncId, totalShards, results));
} }
final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) { final IndexShardRoutingTable getShardRoutingTable(final ShardId shardId, final ClusterState state) {
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.getIndexName()); final IndexMetaData indexMetaData = state.getMetaData().index(shardId.getIndex());
if (indexRoutingTable == null) { if (indexMetaData == null) {
IndexMetaData index = state.getMetaData().index(shardId.getIndex()); throw new IndexNotFoundException(shardId.getIndexName());
if (index != null && index.getState() == IndexMetaData.State.CLOSE) { } else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(shardId.getIndex()); throw new IndexClosedException(shardId.getIndex());
} }
throw new IndexNotFoundException(shardId.getIndexName()); final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(indexMetaData.getIndex()).shard(shardId.id());
}
final IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.id());
if (shardRoutingTable == null) { if (shardRoutingTable == null) {
throw new ShardNotFoundException(shardId); throw new ShardNotFoundException(shardId);
} }

View File

@ -20,11 +20,13 @@ package org.elasticsearch.indices.flush;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
@ -38,6 +40,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
public void testModificationPreventsFlushing() throws InterruptedException { public void testModificationPreventsFlushing() throws InterruptedException {
@ -130,22 +134,26 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
} }
public void testSyncFailsOnIndexClosedOrMissing() throws InterruptedException { public void testSyncFailsOnIndexClosedOrMissing() throws InterruptedException {
createIndex("test"); createIndex("test", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
IndexService test = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test")); IndexService test = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test"));
IndexShard shard = test.getShardOrNull(0); final IndexShard shard = test.getShardOrNull(0);
assertNotNull(shard);
final ShardId shardId = shard.shardId();
final SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener();
flushService.attemptSyncedFlush(new ShardId("test", "_na_", 1), listener); flushService.attemptSyncedFlush(new ShardId(shard.shardId().getIndex(), 1), listener);
listener.latch.await(); listener.latch.await();
assertNotNull(listener.error); assertNotNull(listener.error);
assertNull(listener.result); assertNull(listener.result);
assertEquals(ShardNotFoundException.class, listener.error.getClass()); assertEquals(ShardNotFoundException.class, listener.error.getClass());
assertEquals("no such shard", listener.error.getMessage()); assertEquals("no such shard", listener.error.getMessage());
final ShardId shardId = shard.shardId(); assertAcked(client().admin().indices().prepareClose("test"));
client().admin().indices().prepareClose("test").get();
listener = new SyncedFlushUtil.LatchedListener(); listener = new SyncedFlushUtil.LatchedListener();
flushService.attemptSyncedFlush(shardId, listener); flushService.attemptSyncedFlush(shardId, listener);
listener.latch.await(); listener.latch.await();