Gateway: Chunk based storage broken, fails to recover from gateway, closes #318.
This commit is contained in:
parent
12ef12f7aa
commit
d12c757824
|
@ -594,7 +594,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
|
|||
String firstFileToRecover = fileToRecover.name();
|
||||
if (!blobs.containsKey(fileToRecover.name())) {
|
||||
// chunking, append part0 to it
|
||||
firstFileToRecover = fileToRecover.name() + "part0";
|
||||
firstFileToRecover = fileToRecover.name() + ".part0";
|
||||
}
|
||||
if (!blobs.containsKey(firstFileToRecover)) {
|
||||
// no file, what to do, what to do?
|
||||
|
@ -612,10 +612,11 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
|
|||
|
||||
@Override public synchronized void onCompleted() {
|
||||
int part = partIndex.incrementAndGet();
|
||||
String partName = fileToRecover + ".part" + part;
|
||||
String partName = fileToRecover.name() + ".part" + part;
|
||||
if (blobs.containsKey(partName)) {
|
||||
// continue with the new part
|
||||
indexContainer.readBlob(partName, this);
|
||||
return;
|
||||
} else {
|
||||
// we are done...
|
||||
try {
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.node.internal.InternalNode;
|
||||
import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||
|
@ -58,6 +60,9 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
|
|||
@Test public void testSnapshotOperations() throws Exception {
|
||||
startNode("server1");
|
||||
|
||||
// get the environment, so we can clear the work dir when needed
|
||||
Environment environment = ((InternalNode) node("server1")).injector().getInstance(Environment.class);
|
||||
|
||||
// Translog tests
|
||||
|
||||
logger.info("Creating index [{}]", "test");
|
||||
|
@ -122,7 +127,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
|
|||
|
||||
logger.info("Closing the server");
|
||||
closeNode("server1");
|
||||
logger.info("Starting the server, should recover from the gateway (both index and translog)");
|
||||
logger.info("Starting the server, should recover from the gateway (both index and translog) and reuse work dir");
|
||||
startNode("server1");
|
||||
|
||||
logger.info("Running Cluster Health (wait for the shards to startup)");
|
||||
|
@ -141,6 +146,30 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
|
|||
getResponse = client("server1").get(getRequest("test").type("type1").id("3")).actionGet();
|
||||
assertThat(getResponse.sourceAsString(), equalTo(source("3", "test")));
|
||||
|
||||
logger.info("Closing the server");
|
||||
closeNode("server1");
|
||||
logger.info("Clearing cluster work dir, so there will be a full recovery from the gateway");
|
||||
FileSystemUtils.deleteRecursively(environment.workWithClusterFile());
|
||||
logger.info("Starting the server, should recover from the gateway (both index and translog) without reusing work dir");
|
||||
startNode("server1");
|
||||
|
||||
logger.info("Running Cluster Health (wait for the shards to startup)");
|
||||
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
|
||||
logger.info("Done Cluster Health, status " + clusterHealth.status());
|
||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
|
||||
|
||||
logger.info("Getting #1, should not exists");
|
||||
getResponse = client("server1").get(getRequest("test").type("type1").id("1")).actionGet();
|
||||
assertThat(getResponse.exists(), equalTo(false));
|
||||
logger.info("Getting #2 (not from the translog, but from the index)");
|
||||
getResponse = client("server1").get(getRequest("test").type("type1").id("2")).actionGet();
|
||||
assertThat(getResponse.sourceAsString(), equalTo(source("2", "test")));
|
||||
logger.info("Getting #3 (from the translog)");
|
||||
getResponse = client("server1").get(getRequest("test").type("type1").id("3")).actionGet();
|
||||
assertThat(getResponse.sourceAsString(), equalTo(source("3", "test")));
|
||||
|
||||
|
||||
logger.info("Flushing, so we have actual content in the index files (#3 should be in the index now as well)");
|
||||
client("server1").admin().indices().flush(flushRequest("test")).actionGet();
|
||||
|
||||
|
|
Loading…
Reference in New Issue