diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/get/MultiGetRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/get/MultiGetRequestBuilder.java index e5302f13ef3..275b5897356 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/get/MultiGetRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/get/MultiGetRequestBuilder.java @@ -42,6 +42,20 @@ public class MultiGetRequestBuilder extends BaseRequestBuilder ids) { + for (String id : ids) { + request.add(index, type, id); + } + return this; + } + + public MultiGetRequestBuilder add(String index, @Nullable String type, String... ids) { + for (String id : ids) { + request.add(index, type, id); + } + return this; + } + public MultiGetRequestBuilder add(MultiGetRequest.Item item) { request.add(item); return this; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java index d07152de376..13099eabefa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java @@ -42,6 +42,7 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -202,6 +203,8 @@ public class SimpleBloomCache extends AbstractIndexComponent implements BloomCac } } catch (AlreadyClosedException e) { // ignore, we are getting closed + } catch (ClosedChannelException e) { + // ignore, we are getting closed } catch (Exception e) { logger.warn("failed to load bloom filter for [{}]", e, field); } finally { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/mget/MGetStress1.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/mget/MGetStress1.java new file mode 100644 index 00000000000..fa3f18dbea1 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/mget/MGetStress1.java @@ -0,0 +1,104 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.stress.mget; + +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + */ +public class MGetStress1 { + + public static void main(String[] args) throws Exception { + final int NUMBER_OF_NODES = 2; + final int NUMBER_OF_DOCS = 50000; + final int MGET_BATCH = 1000; + + Node[] nodes = new Node[NUMBER_OF_NODES]; + for (int i = 0; i < nodes.length; i++) { + nodes[i] = NodeBuilder.nodeBuilder().node(); + } + + System.out.println("---> START Indexing initial data [" + NUMBER_OF_DOCS + "]"); + final Client client = nodes[0].client(); + for (int i = 0; i < NUMBER_OF_DOCS; i++) { + client.prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value").execute().actionGet(); + } + System.out.println("---> DONE Indexing initial data [" + NUMBER_OF_DOCS + "]"); + + final AtomicBoolean done = new AtomicBoolean(); + // start indexer + Thread indexer = new Thread(new Runnable() { + @Override public void run() { + while (!done.get()) { + client.prepareIndex("test", "type", Integer.toString(ThreadLocalRandom.current().nextInt(NUMBER_OF_DOCS))) + .setSource("field", "value").execute().actionGet(); + } + } + }); + indexer.start(); + System.out.println("---> Starting indexer"); + + // start the mget one + Thread mget = new Thread(new Runnable() { + @Override public void run() { + while (!done.get()) { + Set ids = Sets.newHashSet(); + for (int i = 0; i < MGET_BATCH; i++) { + ids.add(Integer.toString(ThreadLocalRandom.current().nextInt(NUMBER_OF_DOCS))); + } + //System.out.println("---> mget for [" + ids.size() + "]"); + MultiGetResponse response = client.prepareMultiGet().add("test", "type", ids).execute().actionGet(); + int expected = ids.size(); + int count = 0; + for (MultiGetItemResponse item : response) { + count++; + if (item.failed()) { + System.err.println("item failed... " + item.failure()); + } else { + boolean removed = ids.remove(item.id()); + if (!removed) { + System.err.println("got id twice " + item.id()); + } + } + } + if (expected != count) { + System.err.println("Expected [" + expected + "], got back [" + count + "]"); + } + } + } + }); + mget.start(); + System.out.println("---> Starting mget"); + + Thread.sleep(TimeValue.timeValueMinutes(10).millis()); + + done.set(true); + } +}