diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 4c4dbdb4cb9..f513ff162b4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -580,6 +580,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I out.flush(); CheckIndex.Status status = checkIndex.checkIndex(); if (!status.clean) { + if (state == IndexShardState.CLOSED) { + // ignore if closed.... + return; + } logger.warn("check index [failure]\n{}", new String(os.unsafeByteArray(), 0, os.size())); if (throwException) { throw new IndexShardException(shardId, "index check failure"); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java new file mode 100644 index 00000000000..98e09d4f95e --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/rollingrestart/RollingRestartStressTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.stress.rollingrestart; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.node.internal.InternalNode; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.index.query.xcontent.QueryBuilders.*; + +/** + * @author kimchy (shay.banon) + */ +public class RollingRestartStressTest { + + private final ESLogger logger = Loggers.getLogger(getClass()); + + private int numberOfNodes = 4; + + private long initialNumberOfDocs = 100000; + + private int indexers = 0; + + private TimeValue indexerThrottle = TimeValue.timeValueMillis(100); + + private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS; + + private TimeValue period = TimeValue.timeValueMinutes(20); + + private boolean clearNodeWork = true; + + private Node client; + + private AtomicLong indexCounter = new AtomicLong(); + + public RollingRestartStressTest numberOfNodes(int numberOfNodes) { + this.numberOfNodes = numberOfNodes; + return this; + } + + public RollingRestartStressTest initialNumberOfDocs(long initialNumberOfDocs) { + this.initialNumberOfDocs = initialNumberOfDocs; + return this; + } + + public RollingRestartStressTest indexers(int indexers) { + this.indexers = indexers; + return this; + } + + public RollingRestartStressTest indexerThrottle(TimeValue indexerThrottle) { + this.indexerThrottle = indexerThrottle; + return this; + } + + public RollingRestartStressTest period(TimeValue period) { + this.period = period; + return this; + } + + public RollingRestartStressTest cleanNodeWork(boolean cleanNodeWork) { + this.clearNodeWork = clearNodeWork; + return this; + } + + public RollingRestartStressTest settings(Settings settings) { + this.settings = settings; + return this; + } + + public void run() throws Exception { + Node[] nodes = new Node[numberOfNodes]; + for (int i = 0; i < nodes.length; i++) { + nodes[i] = NodeBuilder.nodeBuilder().settings(settings).node(); + } + client = NodeBuilder.nodeBuilder().settings(settings).client(true).node(); + + logger.info("********** [START] INDEXING INITIAL DOCS"); + for (long i = 0; i < initialNumberOfDocs; i++) { + indexDoc(); + } + logger.info("********** [DONE ] INDEXING INITIAL DOCS"); + + Indexer[] indexerThreads = new Indexer[indexers]; + for (int i = 0; i < indexerThreads.length; i++) { + indexerThreads[i] = new Indexer(); + } + for (int i = 0; i < indexerThreads.length; i++) { + indexerThreads[i].start(); + } + + long testStart = System.currentTimeMillis(); + + // start doing the rolling restart + int nodeIndex = 0; + while (true) { + File nodeWork = ((InternalNode) nodes[nodeIndex]).injector().getInstance(NodeEnvironment.class).nodeLocation(); + nodes[nodeIndex].close(); + if (clearNodeWork) { + FileSystemUtils.deleteRecursively(nodeWork); + } + nodes[nodeIndex] = NodeBuilder.nodeBuilder().settings(settings).node(); + + try { + ClusterHealthResponse clusterHealth = client.client().admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet(); + if (clusterHealth.timedOut()) { + logger.warn("timed out waiting for green status...."); + } + } catch (Exception e) { + logger.warn("failed to execute cluster health...."); + } + + if (++nodeIndex == nodes.length) { + nodeIndex = 0; + } + + if ((System.currentTimeMillis() - testStart) > period.millis()) { + logger.info("test finished"); + break; + } + } + + for (int i = 0; i < indexerThreads.length; i++) { + indexerThreads[i].closed = true; + } + + Thread.sleep(indexerThrottle.millis() + 10000); + + client.client().admin().indices().prepareRefresh().execute().actionGet(); + + // check the count + for (int i = 0; i < (nodes.length * 5); i++) { + CountResponse count = client.client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(); + logger.info("indexed [{}], count [{}]", count.count(), indexCounter.get()); + if (count.count() != indexCounter.get()) { + logger.warn("count does not match!"); + } + } + + client.close(); + for (Node node : nodes) { + node.close(); + } + } + + private class Indexer extends Thread { + + volatile boolean closed = false; + + @Override public void run() { + while (true) { + if (closed) { + return; + } + try { + indexDoc(); + Thread.sleep(indexerThrottle.millis()); + } catch (IOException e) { + logger.warn("failed to index", e); + } catch (InterruptedException e) { + break; + } + } + } + } + + private void indexDoc() throws IOException { + long id = indexCounter.incrementAndGet(); + client.client().prepareIndex("test", "type1") + .setSource(XContentFactory.jsonBuilder().startObject() + .field("field", "value" + id) + .endObject()) + .execute().actionGet(); + } + + public static void main(String[] args) throws Exception { + Settings settings = ImmutableSettings.settingsBuilder() + .put("index.shard.check_index", true) + .put("gateway.type", "none") + .build(); + + RollingRestartStressTest test = new RollingRestartStressTest() + .settings(settings) + .numberOfNodes(4) + .initialNumberOfDocs(100000) + .cleanNodeWork(true) + .indexers(5) + .indexerThrottle(TimeValue.timeValueMillis(100)) + .period(TimeValue.timeValueMinutes(10)); + + test.run(); + } +}