Improve stability of RobinEngineIntegrationTest by reducing direct memory usage of the test env
This commit is contained in:
parent
c95c7096e5
commit
f6c2ee0ab2
|
@ -17,6 +17,7 @@ package org.apache.lucene.store.bytebuffer;
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.lucene.store.*;
|
import org.apache.lucene.store.*;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
|
@ -48,6 +49,7 @@ public class ByteBufferDirectory extends Directory {
|
||||||
|
|
||||||
final AtomicLong sizeInBytes = new AtomicLong();
|
final AtomicLong sizeInBytes = new AtomicLong();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a new directory using {@link PlainByteBufferAllocator}.
|
* Constructs a new directory using {@link PlainByteBufferAllocator}.
|
||||||
*/
|
*/
|
||||||
|
@ -112,10 +114,28 @@ public class ByteBufferDirectory extends Directory {
|
||||||
return file.getLength();
|
return file.getLength();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final static ImmutableSet<String> SMALL_FILES_SUFFIXES = ImmutableSet.of(
|
||||||
|
"del", // 1 bit per doc
|
||||||
|
"cfe", // compound file metadata
|
||||||
|
"si", // segment info
|
||||||
|
"fnm" // field info (metadata like omit norms etc)
|
||||||
|
);
|
||||||
|
|
||||||
|
private static boolean isSmallFile(String fileName) {
|
||||||
|
if (fileName.startsWith("segments")) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (fileName.lastIndexOf('.') > 0) {
|
||||||
|
String suffix = fileName.substring(fileName.lastIndexOf('.') + 1);
|
||||||
|
return SMALL_FILES_SUFFIXES.contains(suffix);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||||
ByteBufferAllocator.Type allocatorType = ByteBufferAllocator.Type.LARGE;
|
ByteBufferAllocator.Type allocatorType = ByteBufferAllocator.Type.LARGE;
|
||||||
if (name.contains("segments") || name.endsWith(".del")) {
|
if (isSmallFile(name)) {
|
||||||
allocatorType = ByteBufferAllocator.Type.SMALL;
|
allocatorType = ByteBufferAllocator.Type.SMALL;
|
||||||
}
|
}
|
||||||
ByteBufferFileOutput file = new ByteBufferFileOutput(this, allocator.sizeInBytes(allocatorType));
|
ByteBufferFileOutput file = new ByteBufferFileOutput(this, allocator.sizeInBytes(allocatorType));
|
||||||
|
|
|
@ -19,11 +19,15 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.engine.robin;
|
package org.elasticsearch.index.engine.robin;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||||
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
|
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
|
||||||
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
|
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
|
||||||
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
|
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
|
||||||
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.index.engine.Segment;
|
import org.elasticsearch.index.engine.Segment;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.test.AbstractIntegrationTest;
|
import org.elasticsearch.test.AbstractIntegrationTest;
|
||||||
|
@ -82,15 +86,22 @@ public class RobinEngineIntegrationTest extends AbstractIntegrationTest {
|
||||||
}
|
}
|
||||||
@Test
|
@Test
|
||||||
public void test4093() {
|
public void test4093() {
|
||||||
|
cluster().ensureAtMostNumNodes(1); // only one node Netty uses lots of native mem as well
|
||||||
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder()
|
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder()
|
||||||
.put("index.store.type", "memory")
|
.put("index.store.type", "memory")
|
||||||
|
.put("cache.memory.large_cache_size", new ByteSizeValue(10, ByteSizeUnit.MB)) // no need to cache a lot
|
||||||
.put("index.number_of_shards", "1")
|
.put("index.number_of_shards", "1")
|
||||||
.put("index.number_of_replicas", "0")
|
.put("index.number_of_replicas", "0")
|
||||||
.put("gateway.type", "none")
|
.put("gateway.type", "none")
|
||||||
.put("http.enabled", false)
|
|
||||||
.put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, randomBoolean())
|
.put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, randomBoolean())
|
||||||
.put("index.warmer.enabled", false)
|
.put("index.warmer.enabled", false)
|
||||||
.build()).get());
|
.build()).get());
|
||||||
|
NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().setJvm(true).get();
|
||||||
|
NodeInfo[] nodes = nodeInfos.getNodes();
|
||||||
|
for (NodeInfo info : nodes) {
|
||||||
|
ByteSizeValue directMemoryMax = info.getJvm().getMem().getDirectMemoryMax();
|
||||||
|
logger.info(" JVM max direct memory for node [{}] is set to [{}]", info.getNode().getName(), directMemoryMax);
|
||||||
|
}
|
||||||
final int iters = between(500, 1000);
|
final int iters = between(500, 1000);
|
||||||
for (int i = 0; i < iters; i++) {
|
for (int i = 0; i < iters; i++) {
|
||||||
client().prepareIndex("test", "type1")
|
client().prepareIndex("test", "type1")
|
||||||
|
@ -99,6 +110,7 @@ public class RobinEngineIntegrationTest extends AbstractIntegrationTest {
|
||||||
.execute()
|
.execute()
|
||||||
.actionGet();
|
.actionGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).get(), iters);
|
assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).get(), iters);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -217,14 +217,19 @@ public class TestCluster implements Closeable, Iterable<Client> {
|
||||||
if (nodes.size() <= num) {
|
if (nodes.size() <= num) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Collection<NodeAndClient> values = nodes.values();
|
// prevent killing the master if possible
|
||||||
Iterator<NodeAndClient> limit = Iterators.limit(values.iterator(), nodes.size() - num);
|
final Iterator<NodeAndClient> values = num == 0 ? nodes.values().iterator() : Iterators.filter(nodes.values().iterator(), Predicates.not(new MasterNodePredicate(getMasterName())));
|
||||||
|
final Iterator<NodeAndClient> limit = Iterators.limit(values, nodes.size() - num);
|
||||||
logger.info("reducing cluster size from {} to {}", nodes.size() - num, num);
|
logger.info("reducing cluster size from {} to {}", nodes.size() - num, num);
|
||||||
|
Set<NodeAndClient> nodesToRemove = new HashSet<NodeAndClient>();
|
||||||
while (limit.hasNext()) {
|
while (limit.hasNext()) {
|
||||||
NodeAndClient next = limit.next();
|
NodeAndClient next = limit.next();
|
||||||
limit.remove();
|
nodesToRemove.add(next);
|
||||||
next.close();
|
next.close();
|
||||||
}
|
}
|
||||||
|
for (NodeAndClient toRemove : nodesToRemove) {
|
||||||
|
nodes.remove(toRemove.name);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private NodeAndClient buildNode(Settings settings) {
|
private NodeAndClient buildNode(Settings settings) {
|
||||||
|
|
Loading…
Reference in New Issue