diff --git a/src/main/java/org/apache/lucene/store/bytebuffer/ByteBufferDirectory.java b/src/main/java/org/apache/lucene/store/bytebuffer/ByteBufferDirectory.java index c6c7ea922b3..7c4ef6dc41c 100644 --- a/src/main/java/org/apache/lucene/store/bytebuffer/ByteBufferDirectory.java +++ b/src/main/java/org/apache/lucene/store/bytebuffer/ByteBufferDirectory.java @@ -17,6 +17,7 @@ package org.apache.lucene.store.bytebuffer; * limitations under the License. */ +import com.google.common.collect.ImmutableSet; import org.apache.lucene.store.*; import java.io.FileNotFoundException; @@ -48,6 +49,7 @@ public class ByteBufferDirectory extends Directory { final AtomicLong sizeInBytes = new AtomicLong(); + /** * Constructs a new directory using {@link PlainByteBufferAllocator}. */ @@ -112,10 +114,28 @@ public class ByteBufferDirectory extends Directory { return file.getLength(); } + private final static ImmutableSet SMALL_FILES_SUFFIXES = ImmutableSet.of( + "del", // 1 bit per doc + "cfe", // compound file metadata + "si", // segment info + "fnm" // field info (metadata like omit norms etc) + ); + + private static boolean isSmallFile(String fileName) { + if (fileName.startsWith("segments")) { + return true; + } + if (fileName.lastIndexOf('.') > 0) { + String suffix = fileName.substring(fileName.lastIndexOf('.') + 1); + return SMALL_FILES_SUFFIXES.contains(suffix); + } + return false; + } + @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { ByteBufferAllocator.Type allocatorType = ByteBufferAllocator.Type.LARGE; - if (name.contains("segments") || name.endsWith(".del")) { + if (isSmallFile(name)) { allocatorType = ByteBufferAllocator.Type.SMALL; } ByteBufferFileOutput file = new ByteBufferFileOutput(this, allocator.sizeInBytes(allocatorType)); diff --git a/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineIntegrationTest.java b/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineIntegrationTest.java index fd9f5515ce5..4fe20eb95a6 100644 --- a/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineIntegrationTest.java +++ b/src/test/java/org/elasticsearch/index/engine/robin/RobinEngineIntegrationTest.java @@ -19,11 +19,15 @@ package org.elasticsearch.index.engine.robin; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.AbstractIntegrationTest; @@ -82,15 +86,22 @@ public class RobinEngineIntegrationTest extends AbstractIntegrationTest { } @Test public void test4093() { + cluster().ensureAtMostNumNodes(1); // only one node Netty uses lots of native mem as well assertAcked(prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder() .put("index.store.type", "memory") + .put("cache.memory.large_cache_size", new ByteSizeValue(10, ByteSizeUnit.MB)) // no need to cache a lot .put("index.number_of_shards", "1") .put("index.number_of_replicas", "0") .put("gateway.type", "none") - .put("http.enabled", false) .put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, randomBoolean()) .put("index.warmer.enabled", false) .build()).get()); + NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().setJvm(true).get(); + NodeInfo[] nodes = nodeInfos.getNodes(); + for (NodeInfo info : nodes) { + ByteSizeValue directMemoryMax = info.getJvm().getMem().getDirectMemoryMax(); + logger.info(" JVM max direct memory for node [{}] is set to [{}]", info.getNode().getName(), directMemoryMax); + } final int iters = between(500, 1000); for (int i = 0; i < iters; i++) { client().prepareIndex("test", "type1") @@ -99,6 +110,7 @@ public class RobinEngineIntegrationTest extends AbstractIntegrationTest { .execute() .actionGet(); } + assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).get(), iters); } } diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index f0ce94bbb2f..2756ba8bde1 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -217,14 +217,19 @@ public class TestCluster implements Closeable, Iterable { if (nodes.size() <= num) { return; } - Collection values = nodes.values(); - Iterator limit = Iterators.limit(values.iterator(), nodes.size() - num); + // prevent killing the master if possible + final Iterator values = num == 0 ? nodes.values().iterator() : Iterators.filter(nodes.values().iterator(), Predicates.not(new MasterNodePredicate(getMasterName()))); + final Iterator limit = Iterators.limit(values, nodes.size() - num); logger.info("reducing cluster size from {} to {}", nodes.size() - num, num); + Set nodesToRemove = new HashSet(); while (limit.hasNext()) { NodeAndClient next = limit.next(); - limit.remove(); + nodesToRemove.add(next); next.close(); } + for (NodeAndClient toRemove : nodesToRemove) { + nodes.remove(toRemove.name); + } } private NodeAndClient buildNode(Settings settings) {