Realtime Get: Under high concurrent indexing and immediate get, a get might be missed while flushing, closes #1344.

This commit is contained in:
Shay Banon 2011-09-18 13:44:08 +03:00
parent 305cf4a567
commit b66a3b7c59
3 changed files with 109 additions and 5 deletions

View File

@ -783,7 +783,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
try {
boolean makeTransientCurrent = false;
if (flush.full()) {
rwl.writeLock().lock();
try {
@ -845,10 +845,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// we did not commit anything, revert to the old translog
translog.revertTransient();
} else {
translog.makeTransientCurrent();
makeTransientCurrent = true;
}
} else {
translog.makeTransientCurrent();
makeTransientCurrent = true;
}
} catch (Exception e) {
translog.revertTransient();
@ -864,13 +864,21 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
refreshVersioningTable(threadPool.estimatedTimeInMillis());
// we need to move transient to current only after we refresh
// so items added to current will still be around for realtime get
// when tans overrides it
if (makeTransientCurrent) {
translog.makeTransientCurrent();
}
try {
SegmentInfos infos = new SegmentInfos();
infos.read(store.directory());
lastCommittedSegmentInfos = infos;
} catch (Exception e) {
if (!closed) {
logger.warn("failed to read latest segment infos on flush", e);
}
}
} finally {
flushing.set(false);
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.stress.get;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class GetStressTest {
public static void main(String[] args) throws Exception {
Settings settings = ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
.build();
final int NUMBER_OF_NODES = 2;
final int NUMBER_OF_THREADS = 50;
final TimeValue TEST_TIME = TimeValue.parseTimeValue("10m", null);
Node[] nodes = new Node[NUMBER_OF_NODES];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeBuilder.nodeBuilder().settings(settings).node();
}
final Node client = NodeBuilder.nodeBuilder()
.settings(settings)
.client(true)
.node();
client.client().admin().indices().prepareCreate("test").execute().actionGet();
final AtomicBoolean done = new AtomicBoolean();
final AtomicLong idGenerator = new AtomicLong();
final AtomicLong counter = new AtomicLong();
Thread[] threads = new Thread[NUMBER_OF_THREADS];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {
@Override public void run() {
ThreadLocalRandom random = ThreadLocalRandom.current();
while (!done.get()) {
String id = String.valueOf(idGenerator.incrementAndGet());
client.client().prepareIndex("test", "type1", id)
.setSource("field", random.nextInt(100))
.execute().actionGet();
GetResponse getResponse = client.client().prepareGet("test", "type1", id)
//.setFields(Strings.EMPTY_ARRAY)
.execute().actionGet();
if (!getResponse.exists()) {
System.err.println("Failed to find " + id);
}
long count = counter.incrementAndGet();
if ((count % 10000) == 0) {
System.out.println("Executed " + count);
}
}
}
});
}
for (Thread thread : threads) {
thread.start();
}
Thread.sleep(TEST_TIME.millis());
System.out.println("test done.");
done.set(true);
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.test.stress.mget;
package org.elasticsearch.test.stress.get;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;