From b66a3b7c5974d2d1b9a843817713528aac5ddb59 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sun, 18 Sep 2011 13:44:08 +0300 Subject: [PATCH] Realtime Get: Under high concurrent indexing and immediate get, a get might be missed while flushing, closes #1344. --- .../index/engine/robin/RobinEngine.java | 16 +++- .../test/stress/get/GetStressTest.java | 96 +++++++++++++++++++ .../stress/{mget => get}/MGetStress1.java | 2 +- 3 files changed, 109 insertions(+), 5 deletions(-) create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/stress/get/GetStressTest.java rename modules/test/integration/src/test/java/org/elasticsearch/test/stress/{mget => get}/MGetStress1.java (98%) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 8e1b4550162..5fb3356b5ce 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -783,7 +783,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } try { - + boolean makeTransientCurrent = false; if (flush.full()) { rwl.writeLock().lock(); try { @@ -845,10 +845,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { // we did not commit anything, revert to the old translog translog.revertTransient(); } else { - translog.makeTransientCurrent(); + makeTransientCurrent = true; } } else { - translog.makeTransientCurrent(); + makeTransientCurrent = true; } } catch (Exception e) { translog.revertTransient(); @@ -864,12 +864,20 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } refreshVersioningTable(threadPool.estimatedTimeInMillis()); + // we need to move transient to current only after we refresh + // so items added to current will still be around for realtime get + // when tans overrides it + if (makeTransientCurrent) { + translog.makeTransientCurrent(); + } try { SegmentInfos infos = new SegmentInfos(); infos.read(store.directory()); lastCommittedSegmentInfos = infos; } catch (Exception e) { - logger.warn("failed to read latest segment infos on flush", e); + if (!closed) { + logger.warn("failed to read latest segment infos on flush", e); + } } } finally { flushing.set(false); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/get/GetStressTest.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/get/GetStressTest.java new file mode 100644 index 00000000000..4220a57f20f --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/get/GetStressTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.stress.get; + +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class GetStressTest { + + public static void main(String[] args) throws Exception { + Settings settings = ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 1) + .build(); + + final int NUMBER_OF_NODES = 2; + final int NUMBER_OF_THREADS = 50; + final TimeValue TEST_TIME = TimeValue.parseTimeValue("10m", null); + + Node[] nodes = new Node[NUMBER_OF_NODES]; + for (int i = 0; i < nodes.length; i++) { + nodes[i] = NodeBuilder.nodeBuilder().settings(settings).node(); + } + + final Node client = NodeBuilder.nodeBuilder() + .settings(settings) + .client(true) + .node(); + + client.client().admin().indices().prepareCreate("test").execute().actionGet(); + + final AtomicBoolean done = new AtomicBoolean(); + final AtomicLong idGenerator = new AtomicLong(); + final AtomicLong counter = new AtomicLong(); + + Thread[] threads = new Thread[NUMBER_OF_THREADS]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(new Runnable() { + @Override public void run() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + while (!done.get()) { + String id = String.valueOf(idGenerator.incrementAndGet()); + client.client().prepareIndex("test", "type1", id) + .setSource("field", random.nextInt(100)) + .execute().actionGet(); + + GetResponse getResponse = client.client().prepareGet("test", "type1", id) + //.setFields(Strings.EMPTY_ARRAY) + .execute().actionGet(); + if (!getResponse.exists()) { + System.err.println("Failed to find " + id); + } + + long count = counter.incrementAndGet(); + if ((count % 10000) == 0) { + System.out.println("Executed " + count); + } + } + } + }); + } + for (Thread thread : threads) { + thread.start(); + } + + Thread.sleep(TEST_TIME.millis()); + + System.out.println("test done."); + done.set(true); + } +} \ No newline at end of file diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/mget/MGetStress1.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/get/MGetStress1.java similarity index 98% rename from modules/test/integration/src/test/java/org/elasticsearch/test/stress/mget/MGetStress1.java rename to modules/test/integration/src/test/java/org/elasticsearch/test/stress/get/MGetStress1.java index fa3f18dbea1..f7262f66d9e 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/mget/MGetStress1.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/get/MGetStress1.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.test.stress.mget; +package org.elasticsearch.test.stress.get; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse;