more work on local gateway, start integration test it

This commit is contained in:
kimchy 2010-08-30 01:25:45 +03:00
parent 8ed54c24bd
commit d9979f8dfe
11 changed files with 114 additions and 19 deletions

View File

@ -378,6 +378,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
CommitPoints commitPoints = new CommitPoints(commitPointsList);
if (commitPoints.commits().isEmpty()) {
// no commit points, clean the store just so we won't recover wrong files
try {
indexShard.store().deleteContent();
} catch (IOException e) {
logger.warn("failed to clean store before starting shard", e);
}
recoveryStatus.index().startTime(System.currentTimeMillis());
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().startTime(System.currentTimeMillis());

View File

@ -347,7 +347,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
}
try {
// now we can close the translog
shardInjector.getInstance(Translog.class).close();
shardInjector.getInstance(Translog.class).close(delete);
} catch (Exception e) {
// ignore
}

View File

@ -90,7 +90,7 @@ public interface Translog extends IndexShardComponent {
/**
* Closes the transaction log.
*/
void close();
void close(boolean delete);
/**
* A snapshot of the transaction log, allows to iterate over all the transaction log operations.

View File

@ -140,7 +140,7 @@ public class FsChannelSnapshot implements Translog.Snapshot {
}
@Override public boolean release() throws ElasticSearchException {
raf.decreaseRefCount();
raf.decreaseRefCount(true);
return true;
}
}

View File

@ -136,7 +136,7 @@ public class FsStreamSnapshot implements Translog.Snapshot {
} catch (IOException e) {
// ignore
}
raf.decreaseRefCount();
raf.decreaseRefCount(true);
return true;
}
}

View File

@ -34,7 +34,6 @@ import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.index.translog.TranslogStreams;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
@ -97,11 +96,12 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
lastPosition = 0;
this.id = id + 1;
if (raf != null) {
raf.decreaseRefCount();
raf.decreaseRefCount(true);
}
try {
raf = new RafReference(new File(location, "translog-" + id));
} catch (FileNotFoundException e) {
raf.raf().setLength(0);
} catch (IOException e) {
raf = null;
throw new TranslogException(shardId, "translog not found", e);
}
@ -114,11 +114,13 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
lastPosition = 0;
this.id = id;
if (raf != null) {
raf.decreaseRefCount();
raf.decreaseRefCount(true);
}
try {
raf = new RafReference(new File(location, "translog-" + id));
} catch (FileNotFoundException e) {
// clean the file if it exists
raf.raf().setLength(0);
} catch (IOException e) {
raf = null;
throw new TranslogException(shardId, "translog not found", e);
}
@ -183,10 +185,10 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
}
}
@Override public void close() {
@Override public void close(boolean delete) {
synchronized (mutex) {
if (raf != null) {
raf.decreaseRefCount();
raf.decreaseRefCount(delete);
raf = null;
}
}

View File

@ -54,11 +54,13 @@ public class RafReference {
refCount.incrementAndGet();
}
public void decreaseRefCount() {
public void decreaseRefCount(boolean delete) {
if (refCount.decrementAndGet() <= 0) {
try {
raf.close();
file.delete();
if (delete) {
file.delete();
}
} catch (IOException e) {
// ignore
}

View File

@ -215,10 +215,6 @@ public final class InternalNode implements Node {
injector.getInstance(plugin).stop();
}
injector.getInstance(NodeEnvironment.class).close();
Injectors.close(injector);
logger.info("{{}}[{}]: stopped", Version.full(), JvmInfo.jvmInfo().pid());
return this;
@ -295,6 +291,9 @@ public final class InternalNode implements Node {
logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint());
}
injector.getInstance(NodeEnvironment.class).close();
Injectors.close(injector);
logger.info("{{}}[{}]: closed", Version.full(), JvmInfo.jvmInfo().pid());
}

View File

@ -46,7 +46,7 @@ public abstract class AbstractSimpleTranslogTests {
}
@AfterMethod public void tearDown() {
translog.close();
translog.close(true);
}
protected abstract Translog create();

View File

@ -233,6 +233,17 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
logger.info("--> creating test index ...");
client("server1").admin().indices().prepareCreate("test").execute().actionGet();
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
logger.info("--> refreshing and checking count");
client("server1").admin().indices().prepareRefresh().execute().actionGet();
assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(0l));
logger.info("--> indexing 12345 docs");
for (long i = 0; i < 12345; i++) {
client("server1").prepareIndex("test", "type1", Long.toString(i))
@ -256,7 +267,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
startNode("server1");
logger.info("--> running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("--> done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.gateway.local;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests {
@AfterMethod public void closeNodes() throws Exception {
node("node1").stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("node1")).injector().getInstance(Gateway.class).reset();
closeAllNodes();
}
@Test public void testSingleNode() throws Exception {
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("node1")).injector().getInstance(Gateway.class).reset();
closeAllNodes();
Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build());
node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
node1.client().admin().indices().prepareFlush().execute().actionGet();
node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
node1.client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
closeNode("node1");
node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").build());
logger.info("Running Cluster Health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l));
}
}