Merge pull request #13246 from brwe/read-post-recovery

Allow reads on shards that are in POST_RECOVERY
This commit is contained in:
Britta Weber 2015-09-02 09:46:01 +02:00
commit 9f49e0e7c1
2 changed files with 297 additions and 5 deletions

View File

@ -111,6 +111,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@ -191,6 +192,8 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexShardOperationCounter indexShardOperationCounter;
private EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
@Inject
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, StoreRecoveryService storeRecoveryService,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService,
@ -953,8 +956,8 @@ public class IndexShard extends AbstractIndexShardComponent {
public void readAllowed() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) {
throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when started/relocated");
if (readAllowedStates.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when shard state is one of " + readAllowedStates.toString());
}
}

View File

@ -23,21 +23,29 @@ import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery;
@ -48,6 +56,10 @@ import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
@ -55,7 +67,10 @@ import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
import org.elasticsearch.test.disruption.*;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import org.junit.Test;
@ -812,7 +827,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
}
/** Test cluster join with issues in cluster state publishing * */
/**
* Test cluster join with issues in cluster state publishing *
*/
@Test
public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
List<String> nodes = startCluster(2, 1);
@ -919,6 +936,277 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
ensureStableCluster(3);
}
/*
* Tests a visibility issue if a shard is in POST_RECOVERY
*
* When a user indexes a document, then refreshes and then a executes a search and all are successful and no timeouts etc then
* the document must be visible for the search.
*
* When a primary is relocating from node_1 to node_2, there can be a short time where both old and new primary
* are started and accept indexing and read requests. However, the new primary might not be visible to nodes
* that lag behind one cluster state. If such a node then sends a refresh to the index, this refresh request
* must reach the new primary on node_2 too. Otherwise a different node that searches on the new primary might not
* find the indexed document although a refresh was executed before.
*
* In detail:
* Cluster state 0:
* node_1: [index][0] STARTED (ShardRoutingState)
* node_2: no shard
*
* 0. primary ([index][0]) relocates from node_1 to node_2
* Cluster state 1:
* node_1: [index][0] RELOCATING (ShardRoutingState), (STARTED from IndexShardState perspective on node_1)
* node_2: [index][0] INITIALIZING (ShardRoutingState), (IndexShardState on node_2 is RECOVERING)
*
* 1. node_2 is done recovering, moves its shard to IndexShardState.POST_RECOVERY and sends a message to master that the shard is ShardRoutingState.STARTED
* Cluster state is still the same but the IndexShardState on node_2 has changed and it now accepts writes and reads:
* node_1: [index][0] RELOCATING (ShardRoutingState), (STARTED from IndexShardState perspective on node_1)
* node_2: [index][0] INITIALIZING (ShardRoutingState), (IndexShardState on node_2 is POST_RECOVERY)
*
* 2. any node receives an index request which is then executed on node_1 and node_2
*
* 3. node_3 sends a refresh but it is a little behind with cluster state processing and still on cluster state 0.
* If refresh was a broadcast operation it send it to node_1 only because it does not know node_2 has a shard too
*
* 4. node_3 catches up with the cluster state and acks it to master which now can process the shard started message
* from node_2 before and updates cluster state to:
* Cluster state 2:
* node_1: [index][0] no shard
* node_2: [index][0] STARTED (ShardRoutingState), (IndexShardState on node_2 is still POST_RECOVERY)
*
* master sends this to all nodes.
*
* 5. node_4 and node_3 process cluster state 2, but node_1 and node_2 have not yet
*
* If now node_4 searches for document that was indexed before, it will search at node_2 because it is on
* cluster state 2. It should be able to retrieve it with a search because the refresh from before was
* successful.
*/
@Test
public void testReadOnPostRecoveryShards() throws Exception {
List<BlockClusterStateProcessing> clusterStateBlocks = new ArrayList<>();
try {
configureUnicastCluster(5, null, 1);
// we could probably write a test without a dedicated master node but it is easier if we use one
Future<String> masterNodeFuture = internalCluster().startMasterOnlyNodeAsync();
// node_1 will have the shard in the beginning
Future<String> node1Future = internalCluster().startDataOnlyNodeAsync();
final String masterNode = masterNodeFuture.get();
final String node_1 = node1Future.get();
logger.info("--> creating index [test] with one shard and zero replica");
assertAcked(prepareCreate("test").setSettings(
Settings.builder().put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexShard.INDEX_REFRESH_INTERVAL, -1))
.addMapping("doc", jsonBuilder().startObject().startObject("doc")
.startObject("properties").startObject("text").field("type", "string").endObject().endObject()
.endObject().endObject())
);
ensureGreen("test");
logger.info("--> starting three more data nodes");
List<String> nodeNamesFuture = internalCluster().startDataOnlyNodesAsync(3).get();
final String node_2 = nodeNamesFuture.get(0);
final String node_3 = nodeNamesFuture.get(1);
final String node_4 = nodeNamesFuture.get(2);
logger.info("--> running cluster_health");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForNodes("5")
.setWaitForRelocatingShards(0)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
logger.info("--> move shard from node_1 to node_2, and wait for relocation to finish");
// block cluster state updates on node_3 so that it only sees the shard on node_1
BlockClusterStateProcessing disruptionNode3 = new BlockClusterStateProcessing(node_3, getRandom());
clusterStateBlocks.add(disruptionNode3);
internalCluster().setDisruptionScheme(disruptionNode3);
disruptionNode3.startDisrupting();
// register a Tracer that notifies begin and end of a relocation
MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2);
CountDownLatch beginRelocationLatchNode2 = new CountDownLatch(1);
CountDownLatch endRelocationLatchNode2 = new CountDownLatch(1);
transportServiceNode2.addTracer(new StartRecoveryToShardStaredTracer(logger, beginRelocationLatchNode2, endRelocationLatchNode2));
// block cluster state updates on node_1 and node_2 so that we end up with two primaries
BlockClusterStateProcessing disruptionNode2 = new BlockClusterStateProcessing(node_2, getRandom());
clusterStateBlocks.add(disruptionNode2);
disruptionNode2.applyToCluster(internalCluster());
BlockClusterStateProcessing disruptionNode1 = new BlockClusterStateProcessing(node_1, getRandom());
clusterStateBlocks.add(disruptionNode1);
disruptionNode1.applyToCluster(internalCluster());
logger.info("--> move shard from node_1 to node_2");
// don't block on the relocation. cluster state updates are blocked on node_3 and the relocation would timeout
Future<ClusterRerouteResponse> rerouteFuture = internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2)).setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS)).execute();
logger.info("--> wait for relocation to start");
// wait for relocation to start
beginRelocationLatchNode2.await();
// start to block cluster state updates on node_1 and node_2 so that we end up with two primaries
// one STARTED on node_1 and one in POST_RECOVERY on node_2
disruptionNode1.startDisrupting();
disruptionNode2.startDisrupting();
endRelocationLatchNode2.await();
final Client node3Client = internalCluster().client(node_3);
final Client node2Client = internalCluster().client(node_2);
final Client node1Client = internalCluster().client(node_1);
final Client node4Client = internalCluster().client(node_4);
logger.info("--> index doc");
logLocalClusterStates(node1Client, node2Client, node3Client, node4Client);
assertTrue(node3Client.prepareIndex("test", "doc").setSource("{\"text\":\"a\"}").get().isCreated());
//sometimes refresh and sometimes flush
int refreshOrFlushType = randomIntBetween(1, 2);
switch (refreshOrFlushType) {
case 1: {
logger.info("--> refresh from node_3");
RefreshResponse refreshResponse = node3Client.admin().indices().prepareRefresh().get();
assertThat(refreshResponse.getFailedShards(), equalTo(0));
// the total shards is num replicas + 1 so that can be lower here because one shard
// is relocating and counts twice as successful
assertThat(refreshResponse.getTotalShards(), equalTo(2));
assertThat(refreshResponse.getSuccessfulShards(), equalTo(2));
break;
}
case 2: {
logger.info("--> flush from node_3");
FlushResponse flushResponse = node3Client.admin().indices().prepareFlush().get();
assertThat(flushResponse.getFailedShards(), equalTo(0));
// the total shards is num replicas + 1 so that can be lower here because one shard
// is relocating and counts twice as successful
assertThat(flushResponse.getTotalShards(), equalTo(2));
assertThat(flushResponse.getSuccessfulShards(), equalTo(2));
break;
}
default:
fail("this is test bug, number should be between 1 and 2");
}
// now stop disrupting so that node_3 can ack last cluster state to master and master can continue
// to publish the next cluster state
logger.info("--> stop disrupting node_3");
disruptionNode3.stopDisrupting();
rerouteFuture.get();
logger.info("--> wait for node_4 to get new cluster state");
// wait until node_4 actually has the new cluster state in which node_1 has no shard
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState clusterState = node4Client.admin().cluster().prepareState().setLocal(true).get().getState();
// get the node id from the name. TODO: Is there a better way to do this?
String nodeId = null;
for (RoutingNode node : clusterState.getRoutingNodes()) {
if (node.node().name().equals(node_1)) {
nodeId = node.nodeId();
}
}
assertNotNull(nodeId);
// check that node_1 does not have the shard in local cluster state
assertFalse(clusterState.getRoutingNodes().routingNodeIter(nodeId).hasNext());
}
});
logger.info("--> run count from node_4");
logLocalClusterStates(node1Client, node2Client, node3Client, node4Client);
CountResponse countResponse = node4Client.prepareCount("test").setPreference("local").get();
assertThat(countResponse.getCount(), equalTo(1l));
logger.info("--> stop disrupting node_1 and node_2");
disruptionNode2.stopDisrupting();
disruptionNode1.stopDisrupting();
// wait for relocation to finish
logger.info("--> wait for relocation to finish");
clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForRelocatingShards(0)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
} catch (AssertionError e) {
for (BlockClusterStateProcessing blockClusterStateProcessing : clusterStateBlocks) {
blockClusterStateProcessing.stopDisrupting();
}
throw e;
}
}
/**
* This Tracer can be used to signal start of a recovery and shard started event after translog was copied
*/
public static class StartRecoveryToShardStaredTracer extends MockTransportService.Tracer {
private final ESLogger logger;
private final CountDownLatch beginRelocationLatch;
private final CountDownLatch sentShardStartedLatch;
public StartRecoveryToShardStaredTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch sentShardStartedLatch) {
this.logger = logger;
this.beginRelocationLatch = beginRelocationLatch;
this.sentShardStartedLatch = sentShardStartedLatch;
}
@Override
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
if (action.equals(RecoverySource.Actions.START_RECOVERY)) {
logger.info("sent: {}, relocation starts", action);
beginRelocationLatch.countDown();
}
if (action.equals(ShardStateAction.SHARD_STARTED_ACTION_NAME)) {
logger.info("sent: {}, shard started", action);
sentShardStartedLatch.countDown();
}
}
}
private void logLocalClusterStates(Client... clients) {
int counter = 1;
for (Client client : clients) {
ClusterState clusterState = client.admin().cluster().prepareState().setLocal(true).get().getState();
logger.info("--> cluster state on node_{} {}", counter, clusterState.prettyPrint());
counter++;
}
}
/**
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
* node but already deleted on the source node. Search request should still work.
*/
@Test
public void searchWithRelocationAndSlowClusterStateProcessing() throws Exception {
configureUnicastCluster(3, null, 1);
Future<String> masterNodeFuture = internalCluster().startMasterOnlyNodeAsync();
Future<String> node_1Future = internalCluster().startDataOnlyNodeAsync();
final String node_1 = node_1Future.get();
final String masterNode = masterNodeFuture.get();
logger.info("--> creating index [test] with one shard and on replica");
assertAcked(prepareCreate("test").setSettings(
Settings.builder().put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
);
ensureGreen("test");
Future<String> node_2Future = internalCluster().startDataOnlyNodeAsync();
final String node_2 = node_2Future.get();
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("doc").setSource("{\"int_field\":1}"));
}
indexRandom(true, indexRequestBuilderList);
SingleNodeDisruption disruption = new BlockClusterStateProcessing(node_2, getRandom());
internalCluster().setDisruptionScheme(disruption);
MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2);
CountDownLatch beginRelocationLatch = new CountDownLatch(1);
CountDownLatch endRelocationLatch = new CountDownLatch(1);
transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch));
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_2)).get();
// wait for relocation to start
beginRelocationLatch.await();
disruption.startDisrupting();
// wait for relocation to finish
endRelocationLatch.await();
// now search for the documents and see if we get a reply
assertThat(client().prepareCount().get().getCount(), equalTo(100l));
}
@Test
public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception {
// test for https://github.com/elastic/elasticsearch/issues/8823
@ -932,6 +1220,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
ensureGreen();
internalCluster().restartNode(masterNode, new InternalTestCluster.RestartCallback() {
@Override
public boolean clearData(String nodeName) {
return true;
}