From d9979f8dfeceb3ef31e38fa74f928514c17c44c7 Mon Sep 17 00:00:00 2001 From: kimchy Date: Mon, 30 Aug 2010 01:25:45 +0300 Subject: [PATCH] more work on local gateway, start integration test it --- .../blobstore/BlobStoreIndexShardGateway.java | 6 ++ .../index/service/InternalIndexService.java | 2 +- .../index/translog/Translog.java | 2 +- .../index/translog/fs/FsChannelSnapshot.java | 2 +- .../index/translog/fs/FsStreamSnapshot.java | 2 +- .../index/translog/fs/FsTranslog.java | 16 ++-- .../index/translog/fs/RafReference.java | 6 +- .../node/internal/InternalNode.java | 7 +- .../translog/AbstractSimpleTranslogTests.java | 2 +- .../fs/AbstractSimpleIndexGatewayTests.java | 13 +++- .../SimpleRecoveryLocalGatewayTests.java | 75 +++++++++++++++++++ 11 files changed, 114 insertions(+), 19 deletions(-) create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index 83dd4252da9..06984c069bf 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -378,6 +378,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo CommitPoints commitPoints = new CommitPoints(commitPointsList); if (commitPoints.commits().isEmpty()) { + // no commit points, clean the store just so we won't recover wrong files + try { + indexShard.store().deleteContent(); + } catch (IOException e) { + logger.warn("failed to clean store before starting shard", e); + } recoveryStatus.index().startTime(System.currentTimeMillis()); recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); recoveryStatus.translog().startTime(System.currentTimeMillis()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index f2cfdb13cbe..e8acca21e50 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -347,7 +347,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde } try { // now we can close the translog - shardInjector.getInstance(Translog.class).close(); + shardInjector.getInstance(Translog.class).close(delete); } catch (Exception e) { // ignore } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index 02815fff8d0..4c253376a93 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -90,7 +90,7 @@ public interface Translog extends IndexShardComponent { /** * Closes the transaction log. */ - void close(); + void close(boolean delete); /** * A snapshot of the transaction log, allows to iterate over all the transaction log operations. diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java index 783f341c56f..1f2359d6291 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java @@ -140,7 +140,7 @@ public class FsChannelSnapshot implements Translog.Snapshot { } @Override public boolean release() throws ElasticSearchException { - raf.decreaseRefCount(); + raf.decreaseRefCount(true); return true; } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java index 414bd6d4c3a..417d6334477 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java @@ -136,7 +136,7 @@ public class FsStreamSnapshot implements Translog.Snapshot { } catch (IOException e) { // ignore } - raf.decreaseRefCount(); + raf.decreaseRefCount(true); return true; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 6f73443a330..7bce1947972 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -34,7 +34,6 @@ import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.index.translog.TranslogStreams; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @@ -97,11 +96,12 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog lastPosition = 0; this.id = id + 1; if (raf != null) { - raf.decreaseRefCount(); + raf.decreaseRefCount(true); } try { raf = new RafReference(new File(location, "translog-" + id)); - } catch (FileNotFoundException e) { + raf.raf().setLength(0); + } catch (IOException e) { raf = null; throw new TranslogException(shardId, "translog not found", e); } @@ -114,11 +114,13 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog lastPosition = 0; this.id = id; if (raf != null) { - raf.decreaseRefCount(); + raf.decreaseRefCount(true); } try { raf = new RafReference(new File(location, "translog-" + id)); - } catch (FileNotFoundException e) { + // clean the file if it exists + raf.raf().setLength(0); + } catch (IOException e) { raf = null; throw new TranslogException(shardId, "translog not found", e); } @@ -183,10 +185,10 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } } - @Override public void close() { + @Override public void close(boolean delete) { synchronized (mutex) { if (raf != null) { - raf.decreaseRefCount(); + raf.decreaseRefCount(delete); raf = null; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/RafReference.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/RafReference.java index 1c04824d761..49e4d424b72 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/RafReference.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/RafReference.java @@ -54,11 +54,13 @@ public class RafReference { refCount.incrementAndGet(); } - public void decreaseRefCount() { + public void decreaseRefCount(boolean delete) { if (refCount.decrementAndGet() <= 0) { try { raf.close(); - file.delete(); + if (delete) { + file.delete(); + } } catch (IOException e) { // ignore } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 667494acf71..ecdadd9f0fe 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -215,10 +215,6 @@ public final class InternalNode implements Node { injector.getInstance(plugin).stop(); } - injector.getInstance(NodeEnvironment.class).close(); - - Injectors.close(injector); - logger.info("{{}}[{}]: stopped", Version.full(), JvmInfo.jvmInfo().pid()); return this; @@ -295,6 +291,9 @@ public final class InternalNode implements Node { logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint()); } + injector.getInstance(NodeEnvironment.class).close(); + Injectors.close(injector); + logger.info("{{}}[{}]: closed", Version.full(), JvmInfo.jvmInfo().pid()); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java index 823648e3e60..12d6a7f2ad8 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java @@ -46,7 +46,7 @@ public abstract class AbstractSimpleTranslogTests { } @AfterMethod public void tearDown() { - translog.close(); + translog.close(true); } protected abstract Translog create(); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java index 4e44eb8d7d1..65695127fe5 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java @@ -233,6 +233,17 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests logger.info("--> creating test index ..."); client("server1").admin().indices().prepareCreate("test").execute().actionGet(); + logger.info("Running Cluster Health (wait for the shards to startup)"); + ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); + + + logger.info("--> refreshing and checking count"); + client("server1").admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(0l)); + logger.info("--> indexing 12345 docs"); for (long i = 0; i < 12345; i++) { client("server1").prepareIndex("test", "type1", Long.toString(i)) @@ -256,7 +267,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests startNode("server1"); logger.info("--> running Cluster Health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("--> done Cluster Health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java new file mode 100644 index 00000000000..6eae0f0df70 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/SimpleRecoveryLocalGatewayTests.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.gateway.local; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import static org.elasticsearch.client.Requests.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.elasticsearch.common.xcontent.XContentFactory.*; +import static org.elasticsearch.index.query.xcontent.QueryBuilders.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class SimpleRecoveryLocalGatewayTests extends AbstractNodesTests { + + @AfterMethod public void closeNodes() throws Exception { + node("node1").stop(); + // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well + ((InternalNode) node("node1")).injector().getInstance(Gateway.class).reset(); + closeAllNodes(); + } + + @Test public void testSingleNode() throws Exception { + buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); + // since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well + ((InternalNode) node("node1")).injector().getInstance(Gateway.class).reset(); + closeAllNodes(); + + Node node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").put("index.number_of_shards", 1).build()); + node1.client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); + node1.client().admin().indices().prepareFlush().execute().actionGet(); + node1.client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet(); + node1.client().admin().indices().prepareRefresh().execute().actionGet(); + + assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l)); + + closeNode("node1"); + node1 = startNode("node1", settingsBuilder().put("gateway.type", "local").build()); + + logger.info("Running Cluster Health (wait for the shards to startup)"); + ClusterHealthResponse clusterHealth = client("node1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); + + assertThat(node1.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(2l)); + } +}