Page-based cache recycling.

Refactor cache recycling so that it only caches large arrays (pages) that can
later be used to build more complex data-structures such as hash tables.

 - QueueRecycler now takes a limit like other non-trivial recyclers.
 - New PageCacheRecycler (inspired of CacheRecycler) has the ability to cache
   byte[], int[], long[], double[] or Object[] arrays using a fixed amount of
   memory (either globally or per-thread depending on the Recycler impl, eg.
   queue is global while thread_local is per-thread).
 - Paged arrays in o.e.common.util can now optionally take a PageCacheRecycler
   to reuse existing pages.
 - All aggregators' data-structures now use PageCacheRecycler:
   - for all arrays (counts, mins, maxes, ...)
   - LongHash can now take a PageCacheRecycler
   - there is a new BytesRefHash (inspired from Lucene but quite different,
     still; for instance it cheats on BytesRef comparisons by using Unsafe)
     that also takes a PageCacheRecycler

Close #4557
This commit is contained in:
Adrien Grand 2013-12-03 16:02:31 +01:00
parent 6481a2fde8
commit 4271d573d6
69 changed files with 2274 additions and 355 deletions

View File

@ -977,6 +977,9 @@
<exclude>org/elasticsearch/bootstrap/Bootstrap.class</exclude>
<exclude>org/elasticsearch/Version.class</exclude>
<!-- end excludes for valid system-out -->
<!-- start excludes for Unsafe -->
<exclude>org/elasticsearch/common/util/UnsafeUtils.class</exclude>
<!-- end excludes for Unsafe -->
</excludes>
<bundledSignatures>
<!-- This will automatically choose the right signatures based on 'maven.compiler.target': -->

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -66,12 +67,15 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
private final CacheRecycler cacheRecycler;
private final PageCacheRecycler pageCacheRecycler;
@Inject
public TransportValidateQueryAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ScriptService scriptService, CacheRecycler cacheRecycler) {
public TransportValidateQueryAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
}
@Override
@ -180,7 +184,7 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct
SearchContext.setCurrent(new DefaultSearchContext(0,
new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()),
null, indexShard.acquireSearcher("validate_query"), indexService, indexShard,
scriptService, cacheRecycler));
scriptService, cacheRecycler, pageCacheRecycler));
try {
ParsedQuery parsedQuery = queryParserService.parseQuery(request.source());
valid = true;

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -67,13 +68,16 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
private final CacheRecycler cacheRecycler;
private final PageCacheRecycler pageCacheRecycler;
@Inject
public TransportCountAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ScriptService scriptService, CacheRecycler cacheRecycler) {
IndicesService indicesService, ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
}
@Override
@ -164,7 +168,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
.filteringAliases(request.filteringAliases())
.nowInMillis(request.nowInMillis()),
shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
scriptService, cacheRecycler);
scriptService, cacheRecycler, pageCacheRecycler);
SearchContext.setCurrent(context);
try {

View File

@ -24,6 +24,7 @@ import org.apache.lucene.search.Filter;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
@ -52,14 +53,17 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
private final ScriptService scriptService;
private final CacheRecycler cacheRecycler;
private final PageCacheRecycler pageCacheRecycler;
@Inject
public TransportShardDeleteByQueryAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ScriptService scriptService, CacheRecycler cacheRecycler) {
ShardStateAction shardStateAction, ScriptService scriptService, CacheRecycler cacheRecycler,
PageCacheRecycler pageCacheRecycler) {
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
}
@Override
@ -109,7 +113,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler));
indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types())
.origin(Engine.Operation.Origin.PRIMARY);
@ -131,7 +135,8 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService, cacheRecycler));
indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
cacheRecycler, pageCacheRecycler));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types())
.origin(Engine.Operation.Origin.REPLICA);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -63,14 +64,17 @@ public class TransportExplainAction extends TransportShardSingleOperationAction<
private final CacheRecycler cacheRecycler;
private final PageCacheRecycler pageCacheRecycler;
@Inject
public TransportExplainAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService,
ScriptService scriptService, CacheRecycler cacheRecycler) {
ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
}
@Override
@ -114,7 +118,7 @@ public class TransportExplainAction extends TransportShardSingleOperationAction<
.filteringAliases(request.filteringAlias())
.nowInMillis(request.nowInMillis),
null, result.searcher(), indexService, indexShard,
scriptService, cacheRecycler
scriptService, cacheRecycler, pageCacheRecycler
);
SearchContext.setCurrent(context);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cache.recycler;
import com.carrotsearch.hppc.*;
import com.google.common.base.Strings;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -62,7 +63,7 @@ public class CacheRecycler extends AbstractComponent {
@Inject
public CacheRecycler(Settings settings) {
super(settings);
String type = settings.get("type", Type.SOFT_THREAD_LOCAL.name());
final Type type = Type.parse(settings.get("type"));
int limit = settings.getAsInt("limit", 10);
int smartSize = settings.getAsInt("smart_size", 1024);
@ -252,27 +253,10 @@ public class CacheRecycler extends AbstractComponent {
return sizing > 0 ? sizing : 256;
}
private <T> Recycler<T> build(String type, int limit, int smartSize, Recycler.C<T> c) {
private <T> Recycler<T> build(Type type, int limit, int smartSize, Recycler.C<T> c) {
Recycler<T> recycler;
try {
// default to soft_thread_local
final Type t = type == null ? Type.SOFT_THREAD_LOCAL : Type.valueOf(type.toUpperCase(Locale.ROOT));
switch (t) {
case SOFT_THREAD_LOCAL:
recycler = new SoftThreadLocalRecycler<T>(c, limit);
break;
case THREAD_LOCAL:
recycler = new ThreadLocalRecycler<T>(c, limit);
break;
case QUEUE:
recycler = new QueueRecycler<T>(c);
break;
case NONE:
recycler = new NoneRecycler<T>(c);
break;
default:
throw new ElasticSearchIllegalArgumentException("no type support [" + type + "] for recycler");
}
recycler = type.build(c, limit);
if (smartSize > 0) {
recycler = new Recycler.Sizing<T>(recycler, smartSize);
}
@ -284,9 +268,53 @@ public class CacheRecycler extends AbstractComponent {
}
public static enum Type {
SOFT_THREAD_LOCAL,
THREAD_LOCAL,
QUEUE,
NONE;
SOFT_THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new SoftThreadLocalRecycler<T>(c, limit);
}
@Override
boolean perThread() {
return true;
}
},
THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new ThreadLocalRecycler<T>(c, limit);
}
@Override
boolean perThread() {
return true;
}
},
QUEUE {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new QueueRecycler<T>(c, limit);
}
},
NONE {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new NoneRecycler<T>(c);
}
};
public static Type parse(String type) {
if (Strings.isNullOrEmpty(type)) {
return SOFT_THREAD_LOCAL;
}
try {
return Type.valueOf(type.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new ElasticSearchIllegalArgumentException("no type support [" + type + "]");
}
}
abstract <T> Recycler<T> build(Recycler.C<T> c, int limit);
boolean perThread() {
return false;
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cache.recycler;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
/**
*/
public class DefaultPageCacheRecyclerModule extends AbstractModule {
private final Settings settings;
public DefaultPageCacheRecyclerModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(PageCacheRecycler.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,212 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cache.recycler;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cache.recycler.CacheRecycler.Type;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.recycler.NoneRecycler;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import java.util.Arrays;
/** A recycler of fixed-size pages. */
public class PageCacheRecycler extends AbstractComponent {
public static final String TYPE = "page.type";
public static final String LIMIT_HEAP = "page.limit.heap";
public static final String LIMIT_PER_THREAD = "page.limit.per_thread";
public static final String WEIGHT = "page.weight";
private final Recycler<byte[]> bytePage;
private final Recycler<int[]> intPage;
private final Recycler<long[]> longPage;
private final Recycler<double[]> doublePage;
private final Recycler<Object[]> objectPage;
public void close() {
bytePage.close();
intPage.close();
longPage.close();
doublePage.close();
objectPage.close();
}
private static int maximumSearchThreadPoolSize(ThreadPool threadPool, Settings settings) {
final ThreadPoolInfo infos = threadPool.info();
for (ThreadPool.Info info : infos) {
if (info.getName().equals(ThreadPool.Names.SEARCH)) {
final int maxSize = info.getMax();
if (maxSize <= 0) {
// happens with cached thread pools, let's assume there are at most 3x ${number of processors} threads
return 3 * EsExecutors.boundedNumberOfProcessors(settings);
} else {
return maxSize;
}
}
}
throw new ElasticSearchIllegalStateException("Couldn't find the [" + ThreadPool.Names.SEARCH + "] thread pool");
}
// return the maximum number of pages that may be cached depending on
// - limit: the total amount of memory available
// - pageSize: the size of a single page
// - weight: the weight for this data type
// - totalWeight: the sum of all weights
private static int maxCount(long limit, long pageSize, double weight, double totalWeight) {
return (int) (weight / totalWeight * limit / pageSize);
}
@Inject
public PageCacheRecycler(Settings settings, ThreadPool threadPool) {
super(settings);
final Type type = Type.parse(componentSettings.get(TYPE));
long limit = componentSettings.getAsMemory(LIMIT_HEAP, "10%").bytes();
if (type.perThread()) {
final long limitPerThread = componentSettings.getAsBytesSize(LIMIT_PER_THREAD, new ByteSizeValue(-1)).bytes();
if (limitPerThread != -1) {
// if the per_thread limit is set, it has precedence
limit = limitPerThread;
} else {
// divide memory equally to all search threads
limit /= maximumSearchThreadPoolSize(threadPool, settings);
}
}
// We have a global amount of memory that we need to divide across data types.
// Since some types are more useful than other ones we give them different weights.
// Trying to store all of them in a single stack would be problematic because eg.
// a work load could fill the recycler with only byte[] pages and then another
// workload that would work with double[] pages couldn't recycle them because there
// is no space left in the stack/queue. LRU/LFU policies are not an option either
// because they would make obtain/release too costly: we really need constant-time
// operations.
// Ultimately a better solution would be to only store one kind of data and have the
// ability to intepret it either as a source of bytes, doubles, longs, etc. eg. thanks
// to direct ByteBuffers or sun.misc.Unsafe on a byte[] but this would have other issues
// that would need to be addressed such as garbage collection of native memory or safety
// of Unsafe writes.
final double bytesWeight = componentSettings.getAsDouble(WEIGHT + ".bytes", 1d);
final double intsWeight = componentSettings.getAsDouble(WEIGHT + ".ints", 1d);
final double longsWeight = componentSettings.getAsDouble(WEIGHT + ".longs", 1d);
final double doublesWeight = componentSettings.getAsDouble(WEIGHT + ".doubles", 1d);
// object pages are less useful to us so we give them a lower weight by default
final double objectsWeight = componentSettings.getAsDouble(WEIGHT + ".objects", 0.1d);
final double totalWeight = bytesWeight + intsWeight + longsWeight + doublesWeight + objectsWeight;
bytePage = build(type, maxCount(limit, BigArrays.BYTE_PAGE_SIZE, bytesWeight, totalWeight), new Recycler.C<byte[]>() {
@Override
public byte[] newInstance(int sizing) {
return new byte[BigArrays.BYTE_PAGE_SIZE];
}
@Override
public void clear(byte[] value) {}
});
intPage = build(type, maxCount(limit, BigArrays.INT_PAGE_SIZE, intsWeight, totalWeight), new Recycler.C<int[]>() {
@Override
public int[] newInstance(int sizing) {
return new int[BigArrays.INT_PAGE_SIZE];
}
@Override
public void clear(int[] value) {}
});
longPage = build(type, maxCount(limit, BigArrays.LONG_PAGE_SIZE, longsWeight, totalWeight), new Recycler.C<long[]>() {
@Override
public long[] newInstance(int sizing) {
return new long[BigArrays.LONG_PAGE_SIZE];
}
@Override
public void clear(long[] value) {}
});
doublePage = build(type, maxCount(limit, BigArrays.DOUBLE_PAGE_SIZE, doublesWeight, totalWeight), new Recycler.C<double[]>() {
@Override
public double[] newInstance(int sizing) {
return new double[BigArrays.DOUBLE_PAGE_SIZE];
}
@Override
public void clear(double[] value) {}
});
objectPage = build(type, maxCount(limit, BigArrays.OBJECT_PAGE_SIZE, objectsWeight, totalWeight), new Recycler.C<Object[]>() {
@Override
public Object[] newInstance(int sizing) {
return new Object[BigArrays.OBJECT_PAGE_SIZE];
}
@Override
public void clear(Object[] value) {
Arrays.fill(value, null); // we need to remove the strong refs on the objects stored in the array
}
});
}
public Recycler.V<byte[]> bytePage(boolean clear) {
final Recycler.V<byte[]> v = bytePage.obtain();
if (v.isRecycled() && clear) {
Arrays.fill(v.v(), (byte) 0);
}
return v;
}
public Recycler.V<int[]> intPage(boolean clear) {
final Recycler.V<int[]> v = intPage.obtain();
if (v.isRecycled() && clear) {
Arrays.fill(v.v(), 0);
}
return v;
}
public Recycler.V<long[]> longPage(boolean clear) {
final Recycler.V<long[]> v = longPage.obtain();
if (v.isRecycled() && clear) {
Arrays.fill(v.v(), 0L);
}
return v;
}
public Recycler.V<double[]> doublePage(boolean clear) {
final Recycler.V<double[]> v = doublePage.obtain();
if (v.isRecycled() && clear) {
Arrays.fill(v.v(), 0d);
}
return v;
}
public Recycler.V<Object[]> objectPage() {
// object pages are cleared on release anyway
return objectPage.obtain();
}
private static <T> Recycler<T> build(Type type, int limit, Recycler.C<T> c) {
final Recycler<T> recycler;
if (limit == 0) {
recycler = new NoneRecycler<T>(c);
} else {
recycler = type.build(c, limit);
}
return recycler;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cache.recycler;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
import static org.elasticsearch.common.inject.Modules.createModule;
/**
*/
public class PageCacheRecyclerModule extends AbstractModule implements SpawnModules {
public static final String CACHE_IMPL = "cache.recycler.page_cache_impl";
private final Settings settings;
public PageCacheRecyclerModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
}
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(createModule(settings.getAsClass(CACHE_IMPL, DefaultPageCacheRecyclerModule.class), settings));
}
}

View File

@ -53,6 +53,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.CacheRecyclerModule;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.transport.support.InternalTransportClient;
@ -289,6 +290,7 @@ public class TransportClient extends AbstractClient {
}
injector.getInstance(CacheRecycler.class).close();
injector.getInstance(PageCacheRecycler.class).close();
CachedStreams.clear();
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lease;
import java.util.Arrays;
/** Utility methods to work with {@link Releasable}s. */
public enum Releasables {
;
private static void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
if (t instanceof Error) {
throw (Error) t;
}
throw new RuntimeException(t);
}
private static void release(Iterable<Releasable> releasables, boolean ignoreException) {
Throwable th = null;
for (Releasable releasable : releasables) {
if (releasable != null) {
try {
releasable.release();
} catch (Throwable t) {
if (th == null) {
th = t;
}
}
}
}
if (th != null && !ignoreException) {
rethrow(th);
}
}
/** Release the provided {@link Releasable}s. */
public static void release(Iterable<Releasable> releasables) {
release(releasables, false);
}
/** Release the provided {@link Releasable}s. */
public static void release(Releasable... releasables) {
release(Arrays.asList(releasables));
}
/** Release the provided {@link Releasable}s, ignoring exceptions. */
public static void releaseWhileHandlingException(Iterable<Releasable> releasables) {
release(releasables, true);
}
/** Release the provided {@link Releasable}s, ignoring exceptions. */
public static void releaseWhileHandlingException(Releasable... releasables) {
releaseWhileHandlingException(Arrays.asList(releasables));
}
/** Release the provided {@link Releasable}s, ignoring exceptions if <code>success</code> is <tt>false</tt>. */
public static void release(boolean success, Iterable<Releasable> releasables) {
if (success) {
release(releasables);
} else {
releaseWhileHandlingException(releasables);
}
}
/** Release the provided {@link Releasable}s, ignoring exceptions if <code>success</code> is <tt>false</tt>. */
public static void release(boolean success, Releasable... releasables) {
release(success, Arrays.asList(releasables));
}
}

View File

@ -58,11 +58,12 @@ public class NoneRecycler<T> extends Recycler<T> {
}
@Override
public void release() {
public boolean release() {
if (value == null) {
throw new ElasticSearchIllegalStateException("recycler entry already released...");
}
value = null;
return true;
}
}
}

View File

@ -23,42 +23,53 @@ import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class QueueRecycler<T> extends Recycler<T> {
final Queue<T> queue;
final AtomicInteger size;
final int maxSize;
public QueueRecycler(C<T> c) {
this(c, ConcurrentCollections.<T>newQueue());
public QueueRecycler(C<T> c, int maxSize) {
this(c, ConcurrentCollections.<T>newQueue(), maxSize);
}
public QueueRecycler(C<T> c, Queue<T> queue) {
public QueueRecycler(C<T> c, Queue<T> queue, int maxSize) {
super(c);
this.queue = queue;
this.maxSize = maxSize;
// we maintain size separately because concurrent queue implementations typically have linear-time size() impls
this.size = new AtomicInteger();
}
@Override
public void close() {
assert queue.size() == size.get();
queue.clear();
size.set(0);
}
@Override
public V<T> obtain(int sizing) {
T v = queue.poll();
final T v = queue.poll();
if (v == null) {
v = c.newInstance(sizing);
return new QV(c.newInstance(sizing), false);
}
return new QV(v);
size.decrementAndGet();
return new QV(v, true);
}
class QV implements Recycler.V<T> {
T value;
final boolean recycled;
QV(T value) {
QV(T value, boolean recycled) {
this.value = value;
this.recycled = recycled;
}
@Override
@ -68,17 +79,22 @@ public class QueueRecycler<T> extends Recycler<T> {
@Override
public boolean isRecycled() {
return true;
return recycled;
}
@Override
public void release() {
public boolean release() {
if (value == null) {
throw new ElasticSearchIllegalStateException("recycler entry already released...");
}
c.clear(value);
queue.offer(value);
if (size.incrementAndGet() <= maxSize) {
c.clear(value);
queue.offer(value);
} else {
size.decrementAndGet();
}
value = null;
return true;
}
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common.recycler;
import org.elasticsearch.common.lease.Releasable;
/**
*/
public abstract class Recycler<T> {
@ -50,18 +52,21 @@ public abstract class Recycler<T> {
public static interface C<T> {
/** Create a new empty instance of the given size. */
T newInstance(int sizing);
/** Clear the data. This operation is called when the data-structure is released. */
void clear(T value);
}
public static interface V<T> {
public static interface V<T> extends Releasable {
/** Reference to the value. */
T v();
/** Whether this instance has been recycled (true) or newly allocated (false). */
boolean isRecycled();
void release();
}
protected final C<T> c;

View File

@ -50,10 +50,11 @@ public class SoftThreadLocalRecycler<T> extends Recycler<T> {
threadLocal.set(new SoftReference<ThreadLocalRecycler.Stack<T>>(stack));
}
T o = stack.pop();
final T o = stack.pop();
if (o == null) {
o = c.newInstance(sizing);
return new ThreadLocalRecycler.TV<T>(stack, c, c.newInstance(sizing), false);
} else {
return new ThreadLocalRecycler.TV<T>(stack, c, o, true);
}
return new ThreadLocalRecycler.TV<T>(stack, c, o);
}
}

View File

@ -50,11 +50,12 @@ public class ThreadLocalRecycler<T> extends Recycler<T> {
@Override
public V<T> obtain(int sizing) {
Stack<T> stack = threadLocal.get();
T o = stack.pop();
final T o = stack.pop();
if (o == null) {
o = c.newInstance(sizing);
return new TV<T>(stack, c, c.newInstance(sizing), false);
} else {
return new TV<T>(stack, c, o, true);
}
return new TV<T>(stack, c, o);
}
static class TV<T> implements Recycler.V<T> {
@ -62,11 +63,13 @@ public class ThreadLocalRecycler<T> extends Recycler<T> {
final Stack<T> stack;
final C<T> c;
T value;
final boolean recycled;
TV(Stack<T> stack, C<T> c, T value) {
TV(Stack<T> stack, C<T> c, T value, boolean recycled) {
this.stack = stack;
this.c = c;
this.value = value;
this.recycled = recycled;
}
@Override
@ -76,11 +79,11 @@ public class ThreadLocalRecycler<T> extends Recycler<T> {
@Override
public boolean isRecycled() {
return true;
return recycled;
}
@Override
public void release() {
public boolean release() {
assert Thread.currentThread() == stack.thread;
if (value == null) {
throw new ElasticSearchIllegalStateException("recycler entry already released...");
@ -88,6 +91,7 @@ public class ThreadLocalRecycler<T> extends Recycler<T> {
c.clear(value);
stack.push(value);
value = null;
return true;
}
}

View File

@ -34,10 +34,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.property.PropertyPlaceholder;
import org.elasticsearch.common.settings.loader.SettingsLoader;
import org.elasticsearch.common.settings.loader.SettingsLoaderFactory;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.unit.*;
import java.io.IOException;
import java.io.InputStream;
@ -289,6 +286,16 @@ public class ImmutableSettings implements Settings {
return parseBytesSizeValue(get(settings), defaultValue);
}
@Override
public ByteSizeValue getAsMemory(String setting, String defaultValue) throws SettingsException {
return MemorySizeValue.parseBytesSizeValueOrHeapRatio(get(setting, defaultValue));
}
@Override
public ByteSizeValue getAsMemory(String[] settings, String defaultValue) throws SettingsException {
return MemorySizeValue.parseBytesSizeValueOrHeapRatio(get(settings, defaultValue));
}
@Override
public SizeValue getAsSize(String setting, SizeValue defaultValue) throws SettingsException {
return parseSizeValue(get(setting), defaultValue);

View File

@ -187,6 +187,20 @@ public interface Settings {
*/
ByteSizeValue getAsBytesSize(String[] settings, ByteSizeValue defaultValue) throws SettingsException;
/**
* Returns the setting value (as size) associated with the setting key. Provided values can either be
* absolute values (intepreted as a number of bytes), byte sizes (eg. 1mb) or percentage of the heap size
* (eg. 12%). If it does not exists, parses the default value provided.
*/
ByteSizeValue getAsMemory(String setting, String defaultValue) throws SettingsException;
/**
* Returns the setting value (as size) associated with the setting key. Provided values can either be
* absolute values (intepreted as a number of bytes), byte sizes (eg. 1mb) or percentage of the heap size
* (eg. 12%). If it does not exists, parses the default value provided.
*/
ByteSizeValue getAsMemory(String[] setting, String defaultValue) throws SettingsException;
/**
* Returns the setting value (as size) associated with the setting key. If it does not exists,
* returns the default value provided.

View File

@ -0,0 +1,41 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.unit;
import org.elasticsearch.monitor.jvm.JvmInfo;
import static org.elasticsearch.common.unit.ByteSizeValue.parseBytesSizeValue;
/** Utility methods to get memory sizes. */
public enum MemorySizeValue {
;
/** Parse the provided string as a memory size. This method either accepts absolute values such as
* <tt>42</tt> (default assumed unit is byte) or <tt>2mb</tt>, or percentages of the heap size: if
* the heap is 1G, <tt>10%</tt> will be parsed as <tt>100mb</tt>. */
public static ByteSizeValue parseBytesSizeValueOrHeapRatio(String sValue) {
if (sValue.endsWith("%")) {
double percent = Double.parseDouble(sValue.substring(0, sValue.length() - 1));
return new ByteSizeValue((long) ((percent / 100) * JvmInfo.jvmInfo().getMem().getHeapMax().bytes()), ByteSizeUnit.BYTES);
} else {
return parseBytesSizeValue(sValue);
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.lease.Releasable;
abstract class AbstractArray implements Releasable {
public final PageCacheRecycler recycler;
public final boolean clearOnResize;
private boolean released = false;
AbstractArray(PageCacheRecycler recycler, boolean clearOnResize) {
this.recycler = recycler;
this.clearOnResize = clearOnResize;
}
@Override
public boolean release() {
assert !released : "double release";
released = true;
return true; // nothing to release by default
}
}

View File

@ -20,20 +20,36 @@
package org.elasticsearch.common.util;
import com.google.common.base.Preconditions;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.recycler.Recycler;
import java.lang.reflect.Array;
import java.util.Arrays;
/** Common implementation for array lists that slice data into fixed-size blocks. */
abstract class AbstractBigArray {
abstract class AbstractBigArray extends AbstractArray {
private Recycler.V<?>[] cache;
private final int pageShift;
private final int pageMask;
protected long size;
protected AbstractBigArray(int pageSize) {
protected AbstractBigArray(int pageSize, PageCacheRecycler recycler, boolean clearOnResize) {
super(recycler, clearOnResize);
Preconditions.checkArgument(pageSize >= 128, "pageSize must be >= 128");
Preconditions.checkArgument((pageSize & (pageSize - 1)) == 0, "pageSize must be a power of two");
this.pageShift = Integer.numberOfTrailingZeros(pageSize);
this.pageMask = pageSize - 1;
size = 0;
if (this.recycler != null) {
cache = new Recycler.V<?>[16];
} else {
cache = null;
}
}
final int numPages(long capacity) {
@ -65,4 +81,82 @@ abstract class AbstractBigArray {
return ((long) pageIndex(size - 1) + 1) * pageSize() * numBytesPerElement();
}
private static <T> T[] grow(T[] array, int minSize) {
if (array.length < minSize) {
final int newLen = ArrayUtil.oversize(minSize, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
array = Arrays.copyOf(array, newLen);
}
return array;
}
private <T> T registerNewPage(Recycler.V<T> v, int page, int expectedSize) {
cache = grow(cache, page + 1);
assert cache[page] == null;
cache[page] = v;
assert Array.getLength(v.v()) == expectedSize;
return v.v();
}
protected final byte[] newBytePage(int page) {
if (recycler != null) {
final Recycler.V<byte[]> v = recycler.bytePage(clearOnResize);
return registerNewPage(v, page, BigArrays.BYTE_PAGE_SIZE);
} else {
return new byte[BigArrays.BYTE_PAGE_SIZE];
}
}
protected final int[] newIntPage(int page) {
if (recycler != null) {
final Recycler.V<int[]> v = recycler.intPage(clearOnResize);
return registerNewPage(v, page, BigArrays.INT_PAGE_SIZE);
} else {
return new int[BigArrays.INT_PAGE_SIZE];
}
}
protected final long[] newLongPage(int page) {
if (recycler != null) {
final Recycler.V<long[]> v = recycler.longPage(clearOnResize);
return registerNewPage(v, page, BigArrays.LONG_PAGE_SIZE);
} else {
return new long[BigArrays.LONG_PAGE_SIZE];
}
}
protected final double[] newDoublePage(int page) {
if (recycler != null) {
final Recycler.V<double[]> v = recycler.doublePage(clearOnResize);
return registerNewPage(v, page, BigArrays.DOUBLE_PAGE_SIZE);
} else {
return new double[BigArrays.DOUBLE_PAGE_SIZE];
}
}
protected final Object[] newObjectPage(int page) {
if (recycler != null) {
final Recycler.V<Object[]> v = recycler.objectPage();
return registerNewPage(v, page, BigArrays.OBJECT_PAGE_SIZE);
} else {
return new Object[BigArrays.OBJECT_PAGE_SIZE];
}
}
protected final void releasePage(int page) {
if (recycler != null) {
cache[page].release();
cache[page] = null;
}
}
@Override
public final boolean release() {
super.release();
if (recycler != null) {
Releasables.release(cache);
cache = null;
}
return true;
}
}

View File

@ -19,8 +19,10 @@
package org.elasticsearch.common.util;
import org.elasticsearch.common.lease.Releasable;
/** Base abstraction of an array. */
interface BigArray {
interface BigArray extends Releasable {
/** Return the length of this array. */
public long size();

View File

@ -21,7 +21,9 @@ package org.elasticsearch.common.util;
import com.google.common.base.Preconditions;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import java.util.Arrays;
@ -31,6 +33,11 @@ public enum BigArrays {
/** Page size in bytes: 16KB */
public static final int PAGE_SIZE_IN_BYTES = 1 << 14;
public static final int BYTE_PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_BYTE;
public static final int INT_PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_INT;
public static final int LONG_PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_LONG;
public static final int DOUBLE_PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_DOUBLE;
public static final int OBJECT_PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_OBJECT_REF;
/** Returns the next size to grow when working with parallel arrays that may have different page sizes or number of bytes per element. */
public static long overSize(long minTargetSize) {
@ -64,11 +71,56 @@ public enum BigArrays {
return index == (int) index;
}
private static class IntArrayWrapper implements IntArray {
private static class ByteArrayWrapper extends AbstractArray implements ByteArray {
private final byte[] array;
ByteArrayWrapper(byte[] array, PageCacheRecycler recycler, boolean clearOnResize) {
super(recycler, clearOnResize);
this.array = array;
}
@Override
public long size() {
return array.length;
}
@Override
public byte get(long index) {
assert indexIsInt(index);
return array[(int) index];
}
@Override
public byte set(long index, byte value) {
assert indexIsInt(index);
final byte ret = array[(int) index];
array[(int) index] = value;
return ret;
}
@Override
public void get(long index, int len, BytesRef ref) {
assert indexIsInt(index);
ref.bytes = array;
ref.offset = (int) index;
ref.length = len;
}
@Override
public void set(long index, byte[] buf, int offset, int len) {
assert indexIsInt(index);
System.arraycopy(buf, offset, array, (int) index, len);
}
}
private static class IntArrayWrapper extends AbstractArray implements IntArray {
private final int[] array;
IntArrayWrapper(int[] array) {
IntArrayWrapper(int[] array, PageCacheRecycler recycler, boolean clearOnResize) {
super(recycler, clearOnResize);
this.array = array;
}
@ -99,11 +151,12 @@ public enum BigArrays {
}
private static class LongArrayWrapper implements LongArray {
private static class LongArrayWrapper extends AbstractArray implements LongArray {
private final long[] array;
LongArrayWrapper(long[] array) {
LongArrayWrapper(long[] array, PageCacheRecycler recycler, boolean clearOnResize) {
super(recycler, clearOnResize);
this.array = array;
}
@ -140,11 +193,12 @@ public enum BigArrays {
}
}
private static class DoubleArrayWrapper implements DoubleArray {
private static class DoubleArrayWrapper extends AbstractArray implements DoubleArray {
private final double[] array;
DoubleArrayWrapper(double[] array) {
DoubleArrayWrapper(double[] array, PageCacheRecycler recycler, boolean clearOnResize) {
super(recycler, clearOnResize);
this.array = array;
}
@ -182,11 +236,12 @@ public enum BigArrays {
}
private static class ObjectArrayWrapper<T> implements ObjectArray<T> {
private static class ObjectArrayWrapper<T> extends AbstractArray implements ObjectArray<T> {
private final Object[] array;
ObjectArrayWrapper(Object[] array) {
ObjectArrayWrapper(Object[] array, PageCacheRecycler recycler) {
super(recycler, true);
this.array = array;
}
@ -213,13 +268,55 @@ public enum BigArrays {
}
/** Allocate a new {@link ByteArray} of the given capacity. */
public static ByteArray newByteArray(long size, PageCacheRecycler recycler, boolean clearOnResize) {
if (size <= BYTE_PAGE_SIZE) {
return new ByteArrayWrapper(new byte[(int) size], recycler, clearOnResize);
} else {
return new BigByteArray(size, recycler, clearOnResize);
}
}
/** Allocate a new {@link ByteArray} of the given capacity. */
public static ByteArray newByteArray(long size) {
return newByteArray(size, null, true);
}
/** Resize the array to the exact provided size. */
public static ByteArray resize(ByteArray array, long size) {
if (array instanceof BigByteArray) {
((BigByteArray) array).resize(size);
return array;
} else {
AbstractArray arr = (AbstractArray) array;
final ByteArray newArray = newByteArray(size, arr.recycler, arr.clearOnResize);
final byte[] rawArray = ((ByteArrayWrapper) array).array;
newArray.set(0, rawArray, 0, (int) Math.min(rawArray.length, newArray.size()));
return newArray;
}
}
/** Grow an array to a size that is larger than <code>minSize</code>, preserving content, and potentially reusing part of the provided array. */
public static ByteArray grow(ByteArray array, long minSize) {
if (minSize <= array.size()) {
return array;
}
final long newSize = overSize(minSize, BYTE_PAGE_SIZE, RamUsageEstimator.NUM_BYTES_BYTE);
return resize(array, newSize);
}
/** Allocate a new {@link IntArray} of the given capacity. */
public static IntArray newIntArray(long size, PageCacheRecycler recycler, boolean clearOnResize) {
if (size <= INT_PAGE_SIZE) {
return new IntArrayWrapper(new int[(int) size], recycler, clearOnResize);
} else {
return new BigIntArray(size, recycler, clearOnResize);
}
}
/** Allocate a new {@link IntArray} of the given capacity. */
public static IntArray newIntArray(long size) {
if (size <= BigIntArray.PAGE_SIZE) {
return new IntArrayWrapper(new int[(int) size]);
} else {
return new BigIntArray(size);
}
return newIntArray(size, null, true);
}
/** Resize the array to the exact provided size. */
@ -228,7 +325,8 @@ public enum BigArrays {
((BigIntArray) array).resize(size);
return array;
} else {
final IntArray newArray = newIntArray(size);
AbstractArray arr = (AbstractArray) array;
final IntArray newArray = newIntArray(size, arr.recycler, arr.clearOnResize);
for (long i = 0, end = Math.min(size, array.size()); i < end; ++i) {
newArray.set(i, array.get(i));
}
@ -241,26 +339,32 @@ public enum BigArrays {
if (minSize <= array.size()) {
return array;
}
final long newSize = overSize(minSize, BigIntArray.PAGE_SIZE, RamUsageEstimator.NUM_BYTES_INT);
final long newSize = overSize(minSize, INT_PAGE_SIZE, RamUsageEstimator.NUM_BYTES_INT);
return resize(array, newSize);
}
/** Allocate a new {@link LongArray} of the given capacity. */
public static LongArray newLongArray(long size) {
if (size <= BigLongArray.PAGE_SIZE) {
return new LongArrayWrapper(new long[(int) size]);
public static LongArray newLongArray(long size, PageCacheRecycler recycler, boolean clearOnResize) {
if (size <= LONG_PAGE_SIZE) {
return new LongArrayWrapper(new long[(int) size], recycler, clearOnResize);
} else {
return new BigLongArray(size);
return new BigLongArray(size, recycler, clearOnResize);
}
}
/** Allocate a new {@link LongArray} of the given capacity. */
public static LongArray newLongArray(long size) {
return newLongArray(size, null, true);
}
/** Resize the array to the exact provided size. */
public static LongArray resize(LongArray array, long size) {
if (array instanceof BigLongArray) {
((BigLongArray) array).resize(size);
return array;
} else {
final LongArray newArray = newLongArray(size);
AbstractArray arr = (AbstractArray) array;
final LongArray newArray = newLongArray(size, arr.recycler, arr.clearOnResize);
for (long i = 0, end = Math.min(size, array.size()); i < end; ++i) {
newArray.set(i, array.get(i));
}
@ -273,26 +377,32 @@ public enum BigArrays {
if (minSize <= array.size()) {
return array;
}
final long newSize = overSize(minSize, BigLongArray.PAGE_SIZE, RamUsageEstimator.NUM_BYTES_LONG);
final long newSize = overSize(minSize, LONG_PAGE_SIZE, RamUsageEstimator.NUM_BYTES_LONG);
return resize(array, newSize);
}
/** Allocate a new {@link LongArray} of the given capacity. */
public static DoubleArray newDoubleArray(long size) {
if (size <= BigLongArray.PAGE_SIZE) {
return new DoubleArrayWrapper(new double[(int) size]);
/** Allocate a new {@link DoubleArray} of the given capacity. */
public static DoubleArray newDoubleArray(long size, PageCacheRecycler recycler, boolean clearOnResize) {
if (size <= LONG_PAGE_SIZE) {
return new DoubleArrayWrapper(new double[(int) size], recycler, clearOnResize);
} else {
return new BigDoubleArray(size);
return new BigDoubleArray(size, recycler, clearOnResize);
}
}
/** Allocate a new {@link DoubleArray} of the given capacity. */
public static DoubleArray newDoubleArray(long size) {
return newDoubleArray(size, null, true);
}
/** Resize the array to the exact provided size. */
public static DoubleArray resize(DoubleArray array, long size) {
if (array instanceof BigDoubleArray) {
((BigDoubleArray) array).resize(size);
return array;
} else {
final DoubleArray newArray = newDoubleArray(size);
AbstractArray arr = (AbstractArray) array;
final DoubleArray newArray = newDoubleArray(size, arr.recycler, arr.clearOnResize);
for (long i = 0, end = Math.min(size, array.size()); i < end; ++i) {
newArray.set(i, array.get(i));
}
@ -305,26 +415,31 @@ public enum BigArrays {
if (minSize <= array.size()) {
return array;
}
final long newSize = overSize(minSize, BigDoubleArray.PAGE_SIZE, RamUsageEstimator.NUM_BYTES_DOUBLE);
final long newSize = overSize(minSize, DOUBLE_PAGE_SIZE, RamUsageEstimator.NUM_BYTES_DOUBLE);
return resize(array, newSize);
}
/** Allocate a new {@link LongArray} of the given capacity. */
public static <T> ObjectArray<T> newObjectArray(long size) {
if (size <= BigLongArray.PAGE_SIZE) {
return new ObjectArrayWrapper<T>(new Object[(int) size]);
/** Allocate a new {@link ObjectArray} of the given capacity. */
public static <T> ObjectArray<T> newObjectArray(long size, PageCacheRecycler recycler) {
if (size <= OBJECT_PAGE_SIZE) {
return new ObjectArrayWrapper<T>(new Object[(int) size], recycler);
} else {
return new BigObjectArray<T>(size);
return new BigObjectArray<T>(size, recycler);
}
}
/** Allocate a new {@link ObjectArray} of the given capacity. */
public static <T> ObjectArray<T> newObjectArray(long size) {
return newObjectArray(size, null);
}
/** Resize the array to the exact provided size. */
public static <T> ObjectArray<T> resize(ObjectArray<T> array, long size) {
if (array instanceof BigObjectArray) {
((BigObjectArray<?>) array).resize(size);
return array;
} else {
final ObjectArray<T> newArray = newObjectArray(size);
final ObjectArray<T> newArray = newObjectArray(size, ((AbstractArray) array).recycler);
for (long i = 0, end = Math.min(size, array.size()); i < end; ++i) {
newArray.set(i, array.get(i));
}
@ -337,7 +452,7 @@ public enum BigArrays {
if (minSize <= array.size()) {
return array;
}
final long newSize = overSize(minSize, BigObjectArray.PAGE_SIZE, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
final long newSize = overSize(minSize, OBJECT_PAGE_SIZE, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
return resize(array, newSize);
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import java.util.Arrays;
import static org.elasticsearch.common.util.BigArrays.BYTE_PAGE_SIZE;
/**
* Byte array abstraction able to support more than 2B values. This implementation slices data into fixed-sized blocks of
* configurable length.
*/
final class BigByteArray extends AbstractBigArray implements ByteArray {
private byte[][] pages;
/** Constructor. */
public BigByteArray(long size, PageCacheRecycler recycler, boolean clearOnResize) {
super(BYTE_PAGE_SIZE, recycler, clearOnResize);
this.size = size;
pages = new byte[numPages(size)][];
for (int i = 0; i < pages.length; ++i) {
pages[i] = newBytePage(i);
}
}
@Override
public byte get(long index) {
final int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
return pages[pageIndex][indexInPage];
}
@Override
public byte set(long index, byte value) {
final int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
final byte[] page = pages[pageIndex];
final byte ret = page[indexInPage];
page[indexInPage] = value;
return ret;
}
@Override
public void get(long index, int len, BytesRef ref) {
assert index + len <= size();
int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
if (indexInPage + len <= pageSize()) {
ref.bytes = pages[pageIndex];
ref.offset = indexInPage;
ref.length = len;
} else {
ref.bytes = new byte[len];
ref.offset = 0;
ref.length = pageSize() - indexInPage;
System.arraycopy(pages[pageIndex], indexInPage, ref.bytes, 0, ref.length);
do {
++pageIndex;
final int copyLength = Math.min(pageSize(), len - ref.length);
System.arraycopy(pages[pageIndex], 0, ref.bytes, ref.length, copyLength);
ref.length += copyLength;
} while (ref.length < len);
}
}
@Override
public void set(long index, byte[] buf, int offset, int len) {
assert index + len <= size();
int pageIndex = pageIndex(index);
final int indexInPage = indexInPage(index);
if (indexInPage + len <= pageSize()) {
System.arraycopy(buf, offset, pages[pageIndex], indexInPage, len);
} else {
int copyLen = pageSize() - indexInPage;
System.arraycopy(buf, offset, pages[pageIndex], indexInPage, copyLen);
do {
++pageIndex;
offset += copyLen;
len -= copyLen;
copyLen = Math.min(len, pageSize());
System.arraycopy(buf, offset, pages[pageIndex], 0, copyLen);
} while (len > copyLen);
}
}
@Override
protected int numBytesPerElement() {
return RamUsageEstimator.NUM_BYTES_BYTE;
}
/** Change the size of this array. Content between indexes <code>0</code> and <code>min(size(), newSize)</code> will be preserved. */
public void resize(long newSize) {
final int numPages = numPages(newSize);
if (numPages > pages.length) {
pages = Arrays.copyOf(pages, ArrayUtil.oversize(numPages, RamUsageEstimator.NUM_BYTES_OBJECT_REF));
}
for (int i = numPages - 1; i >= 0 && pages[i] == null; --i) {
pages[i] = newBytePage(i);
}
for (int i = numPages; i < pages.length && pages[i] != null; ++i) {
pages[i] = null;
releasePage(i);
}
this.size = newSize;
}
}

View File

@ -22,30 +22,27 @@ package org.elasticsearch.common.util;
import com.google.common.base.Preconditions;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import java.util.Arrays;
import static org.elasticsearch.common.util.BigArrays.DOUBLE_PAGE_SIZE;
/**
* Double array abstraction able to support more than 2B values. This implementation slices data into fixed-sized blocks of
* configurable length.
*/
final class BigDoubleArray extends AbstractBigArray implements DoubleArray {
/**
* Page size, 16KB of memory per page.
*/
public static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_DOUBLE;
private double[][] pages;
/** Constructor. */
public BigDoubleArray(long size) {
super(PAGE_SIZE);
public BigDoubleArray(long size, PageCacheRecycler recycler, boolean clearOnResize) {
super(DOUBLE_PAGE_SIZE, recycler, clearOnResize);
this.size = size;
pages = new double[numPages(size)][];
for (int i = 0; i < pages.length; ++i) {
pages[i] = new double[pageSize()];
pages[i] = newDoublePage(i);
}
}
@ -85,10 +82,11 @@ final class BigDoubleArray extends AbstractBigArray implements DoubleArray {
pages = Arrays.copyOf(pages, ArrayUtil.oversize(numPages, RamUsageEstimator.NUM_BYTES_OBJECT_REF));
}
for (int i = numPages - 1; i >= 0 && pages[i] == null; --i) {
pages[i] = new double[pageSize()];
pages[i] = newDoublePage(i);
}
for (int i = numPages; i < pages.length && pages[i] != null; ++i) {
pages[i] = null;
releasePage(i);
}
this.size = newSize;
}

View File

@ -38,7 +38,7 @@ public final class BigDoubleArrayList extends AbstractBigArray {
private double[][] pages;
public BigDoubleArrayList(int pageSize, long initialCapacity) {
super(pageSize);
super(pageSize, null, true);
pages = new double[numPages(initialCapacity)][];
}

View File

@ -36,7 +36,7 @@ public final class BigFloatArrayList extends AbstractBigArray {
private float[][] pages;
public BigFloatArrayList(int pageSize, long initialCapacity) {
super(pageSize);
super(pageSize, null, true);
pages = new float[numPages(initialCapacity)][];
}

View File

@ -21,30 +21,27 @@ package org.elasticsearch.common.util;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import java.util.Arrays;
import static org.elasticsearch.common.util.BigArrays.INT_PAGE_SIZE;
/**
* Int array abstraction able to support more than 2B values. This implementation slices data into fixed-sized blocks of
* configurable length.
*/
final class BigIntArray extends AbstractBigArray implements IntArray {
/**
* Page size, 16KB of memory per page.
*/
public static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_INT;
private int[][] pages;
/** Constructor. */
public BigIntArray(long size) {
super(PAGE_SIZE);
public BigIntArray(long size, PageCacheRecycler recycler, boolean clearOnResize) {
super(INT_PAGE_SIZE, recycler, clearOnResize);
this.size = size;
pages = new int[numPages(size)][];
for (int i = 0; i < pages.length; ++i) {
pages[i] = new int[pageSize()];
pages[i] = newIntPage(i);
}
}
@ -84,10 +81,11 @@ final class BigIntArray extends AbstractBigArray implements IntArray {
pages = Arrays.copyOf(pages, ArrayUtil.oversize(numPages, RamUsageEstimator.NUM_BYTES_OBJECT_REF));
}
for (int i = numPages - 1; i >= 0 && pages[i] == null; --i) {
pages[i] = new int[pageSize()];
pages[i] = newIntPage(i);
}
for (int i = numPages; i < pages.length && pages[i] != null; ++i) {
pages[i] = null;
releasePage(i);
}
this.size = newSize;
}

View File

@ -22,30 +22,27 @@ package org.elasticsearch.common.util;
import com.google.common.base.Preconditions;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import java.util.Arrays;
import static org.elasticsearch.common.util.BigArrays.LONG_PAGE_SIZE;
/**
* Long array abstraction able to support more than 2B values. This implementation slices data into fixed-sized blocks of
* configurable length.
*/
final class BigLongArray extends AbstractBigArray implements LongArray {
/**
* Page size, 16KB of memory per page.
*/
public static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_LONG;
private long[][] pages;
/** Constructor. */
public BigLongArray(long size) {
super(PAGE_SIZE);
public BigLongArray(long size, PageCacheRecycler recycler, boolean clearOnResize) {
super(LONG_PAGE_SIZE, recycler, clearOnResize);
this.size = size;
pages = new long[numPages(size)][];
for (int i = 0; i < pages.length; ++i) {
pages[i] = new long[pageSize()];
pages[i] = newLongPage(i);
}
}
@ -85,10 +82,11 @@ final class BigLongArray extends AbstractBigArray implements LongArray {
pages = Arrays.copyOf(pages, ArrayUtil.oversize(numPages, RamUsageEstimator.NUM_BYTES_OBJECT_REF));
}
for (int i = numPages - 1; i >= 0 && pages[i] == null; --i) {
pages[i] = new long[pageSize()];
pages[i] = newLongPage(i);
}
for (int i = numPages; i < pages.length && pages[i] != null; ++i) {
pages[i] = null;
releasePage(i);
}
this.size = newSize;
}

View File

@ -21,30 +21,27 @@ package org.elasticsearch.common.util;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import java.util.Arrays;
import static org.elasticsearch.common.util.BigArrays.OBJECT_PAGE_SIZE;
/**
* Int array abstraction able to support more than 2B values. This implementation slices data into fixed-sized blocks of
* configurable length.
*/
final class BigObjectArray<T> extends AbstractBigArray implements ObjectArray<T> {
/**
* Page size, 16KB of memory per page.
*/
public static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES / RamUsageEstimator.NUM_BYTES_OBJECT_REF;
private Object[][] pages;
/** Constructor. */
public BigObjectArray(long size) {
super(PAGE_SIZE);
public BigObjectArray(long size, PageCacheRecycler recycler) {
super(OBJECT_PAGE_SIZE, recycler, true);
this.size = size;
pages = new Object[numPages(size)][];
for (int i = 0; i < pages.length; ++i) {
pages[i] = new Object[pageSize()];
pages[i] = newObjectPage(i);
}
}
@ -79,10 +76,11 @@ final class BigObjectArray<T> extends AbstractBigArray implements ObjectArray<T>
pages = Arrays.copyOf(pages, ArrayUtil.oversize(numPages, RamUsageEstimator.NUM_BYTES_OBJECT_REF));
}
for (int i = numPages - 1; i >= 0 && pages[i] == null; --i) {
pages[i] = new Object[pageSize()];
pages[i] = newObjectPage(i);
}
for (int i = numPages; i < pages.length && pages[i] != null; ++i) {
pages[i] = null;
releasePage(i);
}
this.size = newSize;
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import org.apache.lucene.util.BytesRef;
/**
* Abstraction of an array of byte values.
*/
public interface ByteArray extends BigArray {
/**
* Get an element given its index.
*/
public abstract byte get(long index);
/**
* Set a value at the given index and return the previous value.
*/
public abstract byte set(long index, byte value);
/**
* Get a reference to a slice.
*/
public abstract void get(long index, int len, BytesRef ref);
/**
* Bulk set.
*/
public abstract void set(long index, byte[] buf, int offset, int len);
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import org.apache.lucene.util.BytesRef;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
/** Utility methods that use {@link Unsafe}. */
public enum UnsafeUtils {
;
private static final Unsafe UNSAFE;
private static final long BYTE_ARRAY_OFFSET;
private static final int BYTE_ARRAY_SCALE;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
UNSAFE = (Unsafe) theUnsafe.get(null);
BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
BYTE_ARRAY_SCALE = UNSAFE.arrayIndexScale(byte[].class);
} catch (IllegalAccessException e) {
throw new ExceptionInInitializerError("Cannot access Unsafe");
} catch (NoSuchFieldException e) {
throw new ExceptionInInitializerError("Cannot access Unsafe");
} catch (SecurityException e) {
throw new ExceptionInInitializerError("Cannot access Unsafe");
}
}
// Don't expose these methods directly, they are too easy to mis-use since they depend on the byte order.
// If you need methods to read integers, please expose a method that makes the byte order explicit such
// as readIntLE (little endian).
// Also, please ***NEVER*** expose any method that writes using Unsafe, this is too dangerous
private static long readLong(byte[] src, int offset) {
return UNSAFE.getLong(src, BYTE_ARRAY_OFFSET + offset);
}
private static int readInt(byte[] src, int offset) {
return UNSAFE.getInt(src, BYTE_ARRAY_OFFSET + offset);
}
private static short readShort(byte[] src, int offset) {
return UNSAFE.getShort(src, BYTE_ARRAY_OFFSET + offset);
}
private static byte readByte(byte[] src, int offset) {
return UNSAFE.getByte(src, BYTE_ARRAY_OFFSET + BYTE_ARRAY_SCALE * offset);
}
/** Compare the two given {@link BytesRef}s for equality. */
public static boolean equals(BytesRef b1, BytesRef b2) {
int len = b1.length;
if (b2.length != len) {
return false;
}
int o1 = b1.offset, o2 = b2.offset;
while (len >= 8) {
if (readLong(b1.bytes, o1) != readLong(b2.bytes, o2)) {
return false;
}
len -= 8;
o1 += 8;
o2 += 8;
}
if (len >= 4) {
if (readInt(b1.bytes, o1) != readInt(b2.bytes, o2)) {
return false;
}
len -= 4;
o1 += 4;
o2 += 4;
}
if (len >= 2) {
if (readShort(b1.bytes, o1) != readShort(b2.bytes, o2)) {
return false;
}
len -= 2;
o1 += 2;
o2 += 2;
}
if (len == 1) {
if (readByte(b1.bytes, o1) != readByte(b2.bytes, o2)) {
return false;
}
} else {
assert len == 0;
}
return true;
}
}

View File

@ -32,11 +32,11 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.MemorySizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
@ -123,12 +123,7 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
}
private void computeSizeInBytes() {
if (size.endsWith("%")) {
double percent = Double.parseDouble(size.substring(0, size.length() - 1));
sizeInBytes = (long) ((percent / 100) * JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
} else {
sizeInBytes = ByteSizeValue.parseBytesSizeValue(size).bytes();
}
this.sizeInBytes = MemorySizeValue.parseBytesSizeValueOrHeapRatio(size).bytes();
}
public void addReaderKeyToClean(Object readerKey) {

View File

@ -23,20 +23,21 @@ import com.google.common.cache.*;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.SegmentReader;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.monitor.jvm.JvmInfo;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@ -45,8 +46,6 @@ import java.util.concurrent.TimeUnit;
*/
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, AtomicFieldData> {
private static final long JVM_HEAP_MAX_BYTES = JvmInfo.jvmInfo().getMem().getHeapMax().bytes();
Cache<Key, AtomicFieldData> cache;
private volatile String size;
@ -58,8 +57,8 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
public IndicesFieldDataCache(Settings settings) {
super(settings);
this.size = componentSettings.get("size", "-1");
this.sizeInBytes = componentSettings.getAsMemory("size", "-1").bytes();
this.expire = componentSettings.getAsTime("expire", null);
this.sizeInBytes = computeSizeInBytes();
buildCache();
}
@ -78,20 +77,6 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
cache = cacheBuilder.build();
}
/**
* @return the maximum configured size for the field data cache, in bytes, or -1 if not set
*/
public long computeSizeInBytes() {
if (size.equals("-1")) {
return -1;
} else if (size.endsWith("%")) {
double percent = Double.parseDouble(size.substring(0, size.length() - 1));
return (long) ((percent / 100) * JVM_HEAP_MAX_BYTES);
} else {
return ByteSizeValue.parseBytesSizeValue(size).bytes();
}
}
public void close() {
cache.invalidateAll();
}

View File

@ -29,6 +29,8 @@ import org.elasticsearch.cache.NodeCache;
import org.elasticsearch.cache.NodeCacheModule;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.CacheRecyclerModule;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecyclerModule;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClientModule;
import org.elasticsearch.cluster.ClusterModule;
@ -146,6 +148,7 @@ public final class InternalNode implements Node {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
modules.add(new CacheRecyclerModule(settings));
modules.add(new PageCacheRecyclerModule(settings));
modules.add(new PluginsModule(settings, pluginsService));
modules.add(new SettingsModule(settings));
modules.add(new NodeModule(this));
@ -364,6 +367,7 @@ public final class InternalNode implements Node {
injector.getInstance(NodeEnvironment.class).close();
injector.getInstance(CacheRecycler.class).close();
injector.getInstance(PageCacheRecycler.class).close();
Injectors.close(injector);
CachedStreams.clear();

View File

@ -31,6 +31,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.HashedBytesRef;
import org.elasticsearch.common.text.StringText;
@ -93,6 +94,7 @@ public class PercolateContext extends SearchContext {
private final IndexFieldDataService fieldDataService;
private final IndexShard indexShard;
private final CacheRecycler cacheRecycler;
private final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<HashedBytesRef, Query> percolateQueries;
private String[] types;
@ -111,7 +113,7 @@ public class PercolateContext extends SearchContext {
private SearchContextAggregations aggregations;
private QuerySearchResult querySearchResult;
public PercolateContext(PercolateShardRequest request, SearchShardTarget searchShardTarget, IndexShard indexShard, IndexService indexService, CacheRecycler cacheRecycler) {
public PercolateContext(PercolateShardRequest request, SearchShardTarget searchShardTarget, IndexShard indexShard, IndexService indexService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler) {
this.request = request;
this.indexShard = indexShard;
this.indexService = indexService;
@ -120,6 +122,7 @@ public class PercolateContext extends SearchContext {
this.percolateQueries = indexShard.percolateRegistry().percolateQueries();
this.types = new String[]{request.documentType()};
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
this.querySearchResult = new QuerySearchResult(0, searchShardTarget);
this.engineSearcher = indexShard.acquireSearcher("percolate");
this.searcher = new ContextIndexSearcher(this, engineSearcher);
@ -473,6 +476,11 @@ public class PercolateContext extends SearchContext {
return cacheRecycler;
}
@Override
public PageCacheRecycler pageCacheRecycler() {
return pageCacheRecycler;
}
@Override
public FilterCache filterCache() {
throw new UnsupportedOperationException();

View File

@ -37,6 +37,7 @@ import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.percolate.PercolateShardResponse;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -71,7 +72,10 @@ import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.percolator.QueryCollector.*;
import org.elasticsearch.percolator.QueryCollector.Count;
import org.elasticsearch.percolator.QueryCollector.Match;
import org.elasticsearch.percolator.QueryCollector.MatchAndScore;
import org.elasticsearch.percolator.QueryCollector.MatchAndSort;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationPhase;
@ -104,6 +108,7 @@ public class PercolatorService extends AbstractComponent {
private final IndicesService indicesService;
private final ByteObjectOpenHashMap<PercolatorType> percolatorTypes;
private final CacheRecycler cacheRecycler;
private final PageCacheRecycler pageCacheRecycler;
private final ClusterService clusterService;
private final FacetPhase facetPhase;
@ -111,12 +116,13 @@ public class PercolatorService extends AbstractComponent {
private final AggregationPhase aggregationPhase;
@Inject
public PercolatorService(Settings settings, IndicesService indicesService, CacheRecycler cacheRecycler,
public PercolatorService(Settings settings, IndicesService indicesService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler,
HighlightPhase highlightPhase, ClusterService clusterService, FacetPhase facetPhase,
AggregationPhase aggregationPhase) {
super(settings);
this.indicesService = indicesService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
this.clusterService = clusterService;
this.highlightPhase = highlightPhase;
this.facetPhase = facetPhase;
@ -155,7 +161,7 @@ public class PercolatorService extends AbstractComponent {
SearchShardTarget searchShardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
final PercolateContext context = new PercolateContext(
request, searchShardTarget, indexShard, percolateIndexService, cacheRecycler
request, searchShardTarget, indexShard, percolateIndexService, cacheRecycler, pageCacheRecycler
);
try {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
@ -100,6 +101,8 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
private final CacheRecycler cacheRecycler;
private final PageCacheRecycler pageCacheRecycler;
private final DfsPhase dfsPhase;
private final QueryPhase queryPhase;
@ -118,7 +121,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
@Inject
public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, IndicesWarmer indicesWarmer, ThreadPool threadPool,
ScriptService scriptService, CacheRecycler cacheRecycler, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) {
ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
@ -126,6 +129,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
this.indicesWarmer = indicesWarmer;
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
this.dfsPhase = dfsPhase;
this.queryPhase = queryPhase;
this.fetchPhase = fetchPhase;
@ -477,7 +481,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, cacheRecycler);
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler);
SearchContext.setCurrent(context);
try {
context.scroll(request.scroll());

View File

@ -26,6 +26,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.lucene.search.XCollector;
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
@ -104,34 +105,40 @@ public class AggregationPhase implements SearchPhase {
}
Aggregator[] aggregators = context.aggregations().aggregators();
List<Aggregator> globals = new ArrayList<Aggregator>();
for (int i = 0; i < aggregators.length; i++) {
if (aggregators[i] instanceof GlobalAggregator) {
globals.add(aggregators[i]);
boolean success = false;
try {
List<Aggregator> globals = new ArrayList<Aggregator>();
for (int i = 0; i < aggregators.length; i++) {
if (aggregators[i] instanceof GlobalAggregator) {
globals.add(aggregators[i]);
}
}
}
// optimize the global collector based execution
if (!globals.isEmpty()) {
AggregationsCollector collector = new AggregationsCollector(globals, context.aggregations().aggregationContext());
Query query = new XConstantScoreQuery(Queries.MATCH_ALL_FILTER);
Filter searchFilter = context.searchFilter(context.types());
if (searchFilter != null) {
query = new XFilteredQuery(query, searchFilter);
// optimize the global collector based execution
if (!globals.isEmpty()) {
AggregationsCollector collector = new AggregationsCollector(globals, context.aggregations().aggregationContext());
Query query = new XConstantScoreQuery(Queries.MATCH_ALL_FILTER);
Filter searchFilter = context.searchFilter(context.types());
if (searchFilter != null) {
query = new XFilteredQuery(query, searchFilter);
}
try {
context.searcher().search(query, collector);
} catch (Exception e) {
throw new QueryPhaseExecutionException(context, "Failed to execute global aggregators", e);
}
collector.postCollection();
}
try {
context.searcher().search(query, collector);
} catch (Exception e) {
throw new QueryPhaseExecutionException(context, "Failed to execute global aggregators", e);
}
collector.postCollection();
}
List<InternalAggregation> aggregations = new ArrayList<InternalAggregation>(aggregators.length);
for (Aggregator aggregator : context.aggregations().aggregators()) {
aggregations.add(aggregator.buildAggregation(0));
List<InternalAggregation> aggregations = new ArrayList<InternalAggregation>(aggregators.length);
for (Aggregator aggregator : context.aggregations().aggregators()) {
aggregations.add(aggregator.buildAggregation(0));
}
context.queryResult().aggregations(new InternalAggregations(aggregations));
success = true;
} finally {
Releasables.release(success, aggregators);
}
context.queryResult().aggregations(new InternalAggregations(aggregations));
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.internal.SearchContext;
@ -27,7 +29,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public abstract class Aggregator {
public abstract class Aggregator implements Releasable {
/**
* Defines the nature of the aggregator's aggregation execution when nested in other aggregators and the buckets they create.
@ -149,6 +151,22 @@ public abstract class Aggregator {
doPostCollection();
}
/** Called upon release of the aggregator. */
@Override
public boolean release() {
boolean success = false;
try {
doRelease();
success = true;
} finally {
Releasables.release(success, subAggregators);
}
return true;
}
/** Release instance-specific data. */
protected void doRelease() {}
/**
* Can be overriden by aggregator implementation to be called back when the collection phase ends.
*/

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.search.aggregations.Aggregator.BucketAggregationMode;
@ -64,7 +65,7 @@ public class AggregatorFactories {
ObjectArray<Aggregator> aggregators;
{
aggregators = BigArrays.newObjectArray(estimatedBucketsCount);
aggregators = BigArrays.newObjectArray(estimatedBucketsCount, context.pageCacheRecycler());
aggregators.set(0, first);
for (long i = 1; i < estimatedBucketsCount; ++i) {
aggregators.set(i, factory.create(parent.context(), parent, estimatedBucketsCount));
@ -106,6 +107,11 @@ public class AggregatorFactories {
public InternalAggregation buildEmptyAggregation() {
return first.buildEmptyAggregation();
}
@Override
public void doRelease() {
Releasables.release(aggregators);
}
};
}
return aggregators;

View File

@ -0,0 +1,138 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import com.google.common.base.Preconditions;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
/**
* Base implementation for {@link BytesRefHash} and {@link LongHash}.
*/
// IDs are internally stored as id + 1 so that 0 encodes for an empty slot
abstract class AbstractHash implements Releasable {
// Open addressing typically requires having smaller load factors compared to linked lists because
// collisions may result into worse lookup performance.
static final float DEFAULT_MAX_LOAD_FACTOR = 0.6f;
final float maxLoadFactor;
long size, maxSize;
LongArray ids;
long mask;
AbstractHash(long capacity, float maxLoadFactor, PageCacheRecycler recycler) {
Preconditions.checkArgument(capacity >= 0, "capacity must be >= 0");
Preconditions.checkArgument(maxLoadFactor > 0 && maxLoadFactor < 1, "maxLoadFactor must be > 0 and < 1");
this.maxLoadFactor = maxLoadFactor;
long buckets = 1L + (long) (capacity / maxLoadFactor);
buckets = Math.max(1, Long.highestOneBit(buckets - 1) << 1); // next power of two
assert buckets == Long.highestOneBit(buckets);
maxSize = (long) (buckets * maxLoadFactor);
assert maxSize >= capacity;
size = 0;
ids = BigArrays.newLongArray(buckets, recycler, true);
mask = buckets - 1;
}
/**
* Return the number of allocated slots to store this hash table.
*/
public long capacity() {
return ids.size();
}
/**
* Return the number of longs in this hash table.
*/
public long size() {
return size;
}
static long slot(long hash, long mask) {
return hash & mask;
}
static long nextSlot(long curSlot, long mask) {
return (curSlot + 1) & mask; // linear probing
}
/**
* Get the id associated with key at <code>0 &lte; index &lte; capacity()</code> or -1 if this slot is unused.
*/
public long id(long index) {
return ids.get(index) - 1;
}
protected final long id(long index, long id) {
return ids.set(index, id + 1) - 1;
}
/** Resize keys to the given capacity. */
protected void resizeKeys(long capacity) {}
/** Remove key at the given index and */
protected abstract void removeAndAdd(long index, long id);
protected final void grow() {
// The difference of this implementation of grow() compared to standard hash tables is that we are growing in-place, which makes
// the re-mapping of keys to slots a bit more tricky.
assert size == maxSize;
final long prevSize = size;
final long buckets = capacity();
// Resize arrays
final long newBuckets = buckets << 1;
assert newBuckets == Long.highestOneBit(newBuckets) : newBuckets; // power of 2
resizeKeys(newBuckets);
ids = BigArrays.resize(ids, newBuckets);
mask = newBuckets - 1;
// First let's remap in-place: most data will be put in its final position directly
for (long i = 0; i < buckets; ++i) {
final long id = id(i, -1);
if (id != -1) {
removeAndAdd(i, id);
}
}
// The only entries which have not been put in their final position in the previous loop are those that were stored in a slot that
// is < slot(key, mask). This only happens when slot(key, mask) returned a slot that was close to the end of the array and colision
// resolution has put it back in the first slots. This time, collision resolution will have put them at the beginning of the newly
// allocated slots. Let's re-add them to make sure they are in the right slot. This 2nd loop will typically exit very early.
for (long i = buckets; i < newBuckets; ++i) {
final long id = id(i, -1);
if (id != -1) {
removeAndAdd(i, id); // add it back
} else {
break;
}
}
assert size == prevSize;
maxSize = (long) (newBuckets * maxLoadFactor);
assert size < maxSize;
}
@Override
public boolean release() {
Releasables.release(ids);
return true;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.Aggregator;
@ -35,12 +36,12 @@ import java.util.Arrays;
*/
public abstract class BucketsAggregator extends Aggregator {
protected LongArray docCounts;
private LongArray docCounts;
public BucketsAggregator(String name, BucketAggregationMode bucketAggregationMode, AggregatorFactories factories,
long estimatedBucketsCount, AggregationContext context, Aggregator parent) {
super(name, bucketAggregationMode, factories, estimatedBucketsCount, context, parent);
docCounts = BigArrays.newLongArray(estimatedBucketsCount);
docCounts = BigArrays.newLongArray(estimatedBucketsCount, context.pageCacheRecycler(), true);
}
/**
@ -100,4 +101,16 @@ public abstract class BucketsAggregator extends Aggregator {
return new InternalAggregations(Arrays.asList(aggregations));
}
@Override
public final boolean release() {
boolean success = false;
try {
super.release();
success = true;
} finally {
Releasables.release(success, docCounts);
}
return true;
}
}

View File

@ -0,0 +1,170 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import com.carrotsearch.hppc.hash.MurmurHash3;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.*;
/**
* Specialized hash table implementation similar to Lucene's BytesRefHash that maps
* BytesRef values to ids. Collisions are resolved with open addressing and linear
* probing, growth is smooth thanks to {@link BigArrays}, hashes are cached for faster
* re-hashing and capacity is always a multiple of 2 for faster identification of buckets.
*/
public final class BytesRefHash extends AbstractHash {
private LongArray startOffsets;
private ByteArray bytes;
private IntArray hashes; // we cache hashes for faster re-hashing
private final BytesRef spare;
// Constructor with configurable capacity and default maximum load factor.
public BytesRefHash(long capacity, PageCacheRecycler recycler) {
this(capacity, DEFAULT_MAX_LOAD_FACTOR, recycler);
}
//Constructor with configurable capacity and load factor.
public BytesRefHash(long capacity, float maxLoadFactor, PageCacheRecycler recycler) {
super(capacity, maxLoadFactor, recycler);
startOffsets = BigArrays.newLongArray(capacity + 1, recycler, false);
bytes = BigArrays.newByteArray(capacity * 3, recycler, false);
hashes = BigArrays.newIntArray(capacity, recycler, false);
spare = new BytesRef();
}
// BytesRef has a weak hashCode function so we try to improve it by rehashing using Murmur3
// Feel free to remove rehashing if BytesRef gets a better hash function
private static int rehash(int hash) {
return MurmurHash3.hash(hash);
}
/**
* Return the key at <code>0 &lte; index &lte; capacity()</code>. The result is undefined if the slot is unused.
*/
public BytesRef get(long id, BytesRef dest) {
final long startOffset = startOffsets.get(id);
final int length = (int) (startOffsets.get(id + 1) - startOffset);
bytes.get(startOffset, length, dest);
return dest;
}
/**
* Get the id associated with <code>key</code>
*/
public long find(BytesRef key, int code) {
final long slot = slot(rehash(code), mask);
for (long index = slot; ; index = nextSlot(index, mask)) {
final long id = id(index);
if (id == -1L || UnsafeUtils.equals(key, get(id, spare))) {
return id;
}
}
}
/** Sugar for {@link #find(BytesRef, int) find(key, key.hashCode()} */
public long find(BytesRef key) {
return find(key, key.hashCode());
}
private long set(BytesRef key, int code, long id) {
assert rehash(key.hashCode()) == code;
assert size < maxSize;
final long slot = slot(code, mask);
for (long index = slot; ; index = nextSlot(index, mask)) {
final long curId = id(index);
if (curId == -1) { // means unset
id(index, id);
append(id, key, code);
++size;
return id;
} else if (UnsafeUtils.equals(key, get(curId, spare))) {
return -1 - curId;
}
}
}
private void append(long id, BytesRef key, int code) {
assert size == id;
final long startOffset = startOffsets.get(size);
bytes = BigArrays.grow(bytes, startOffset + key.length);
bytes.set(startOffset, key.bytes, key.offset, key.length);
startOffsets = BigArrays.grow(startOffsets, size + 2);
startOffsets.set(size + 1, startOffset + key.length);
hashes = BigArrays.grow(hashes, id + 1);
hashes.set(id, code);
}
private boolean assertConsistent(long id, int code) {
get(id, spare);
return rehash(spare.hashCode()) == code;
}
private void reset(int code, long id) {
assert assertConsistent(id, code);
final long slot = slot(code, mask);
for (long index = slot; ; index = nextSlot(index, mask)) {
final long curId = id(index);
if (curId == -1) { // means unset
id(index, id);
break;
}
}
}
/**
* Try to add <code>key</code>. Return its newly allocated id if it wasn't in the hash table yet, or </code>-1-id</code>
* if it was already present in the hash table.
*/
public long add(BytesRef key, int code) {
if (size >= maxSize) {
assert size == maxSize;
grow();
}
assert size < maxSize;
return set(key, rehash(code), size);
}
/** Sugar to {@link #add(BytesRef, int) add(key, key.hashCode()}. */
public long add(BytesRef key) {
return add(key, key.hashCode());
}
@Override
protected void removeAndAdd(long index, long id) {
final int code = hashes.get(id);
reset(code, id);
}
@Override
public boolean release() {
boolean success = false;
try {
super.release();
success = true;
} finally {
Releasables.release(success, bytes, hashes);
}
return true;
}
}

View File

@ -20,7 +20,8 @@
package org.elasticsearch.search.aggregations.bucket;
import com.carrotsearch.hppc.hash.MurmurHash3;
import com.google.common.base.Preconditions;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
@ -31,51 +32,19 @@ import org.elasticsearch.common.util.LongArray;
* a multiple of 2 for faster identification of buckets.
*/
// IDs are internally stored as id + 1 so that 0 encodes for an empty slot
public final class LongHash {
public final class LongHash extends AbstractHash {
// Open addressing typically requires having smaller load factors compared to linked lists because
// collisions may result into worse lookup performance.
private static final float DEFAULT_MAX_LOAD_FACTOR = 0.6f;
private final float maxLoadFactor;
private long size, maxSize;
private LongArray keys;
private LongArray ids;
private long mask;
// Constructor with configurable capacity and default maximum load factor.
public LongHash(long capacity) {
this(capacity, DEFAULT_MAX_LOAD_FACTOR);
public LongHash(long capacity, PageCacheRecycler recycler) {
this(capacity, DEFAULT_MAX_LOAD_FACTOR, recycler);
}
//Constructor with configurable capacity and load factor.
public LongHash(long capacity, float maxLoadFactor) {
Preconditions.checkArgument(capacity >= 0, "capacity must be >= 0");
Preconditions.checkArgument(maxLoadFactor > 0 && maxLoadFactor < 1, "maxLoadFactor must be > 0 and < 1");
this.maxLoadFactor = maxLoadFactor;
long buckets = 1L + (long) (capacity / maxLoadFactor);
buckets = Math.max(1, Long.highestOneBit(buckets - 1) << 1); // next power of two
assert buckets == Long.highestOneBit(buckets);
maxSize = (long) (buckets * maxLoadFactor);
assert maxSize >= capacity;
size = 0;
keys = BigArrays.newLongArray(buckets);
ids = BigArrays.newLongArray(buckets);
mask = buckets - 1;
}
/**
* Return the number of allocated slots to store this hash table.
*/
public long capacity() {
return keys.size();
}
/**
* Return the number of longs in this hash table.
*/
public long size() {
return size;
public LongHash(long capacity, float maxLoadFactor, PageCacheRecycler recycler) {
super(capacity, maxLoadFactor, recycler);
keys = BigArrays.newLongArray(capacity(), recycler, false);
}
private static long hash(long value) {
@ -84,21 +53,6 @@ public final class LongHash {
return MurmurHash3.hash(value);
}
private static long slot(long hash, long mask) {
return hash & mask;
}
private static long nextSlot(long curSlot, long mask) {
return (curSlot + 1) & mask; // linear probing
}
/**
* Get the id associated with key at <code>0 &lte; index &lte; capacity()</code> or -1 if this slot is unused.
*/
public long id(long index) {
return ids.get(index) - 1;
}
/**
* Return the key at <code>0 &lte; index &lte; capacity()</code>. The result is undefined if the slot is unused.
*/
@ -109,12 +63,12 @@ public final class LongHash {
/**
* Get the id associated with <code>key</code>
*/
public long get(long key) {
public long find(long key) {
final long slot = slot(hash(key), mask);
for (long index = slot; ; index = nextSlot(index, mask)) {
final long id = ids.get(index);
if (id == 0L || keys.get(index) == key) {
return id - 1;
final long id = id(index);
if (id == -1 || keys.get(index) == key) {
return id;
}
}
}
@ -123,14 +77,28 @@ public final class LongHash {
assert size < maxSize;
final long slot = slot(hash(key), mask);
for (long index = slot; ; index = nextSlot(index, mask)) {
final long curId = ids.get(index);
if (curId == 0) { // means unset
ids.set(index, id + 1);
final long curId = id(index);
if (curId == -1) { // means unset
id(index, id);
keys.set(index, key);
++size;
return id;
} else if (keys.get(index) == key) {
return - curId;
return -1 - curId;
}
}
}
private void reset(long key, long id) {
final long slot = slot(hash(key), mask);
for (long index = slot; ; index = nextSlot(index, mask)) {
final long curId = id(index);
if (curId == -1) { // means unset
id(index, id);
keys.set(index, key);
break;
} else {
assert keys.get(index) != key;
}
}
}
@ -148,47 +116,27 @@ public final class LongHash {
return set(key, size);
}
private void grow() {
// The difference of this implementation of grow() compared to standard hash tables is that we are growing in-place, which makes
// the re-mapping of keys to slots a bit more tricky.
assert size == maxSize;
final long prevSize = size;
final long buckets = keys.size();
// Resize arrays
final long newBuckets = buckets << 1;
assert newBuckets == Long.highestOneBit(newBuckets) : newBuckets; // power of 2
keys = BigArrays.resize(keys, newBuckets);
ids = BigArrays.resize(ids, newBuckets);
mask = newBuckets - 1;
size = 0;
// First let's remap in-place: most data will be put in its final position directly
for (long i = 0; i < buckets; ++i) {
final long id = ids.set(i, 0);
if (id > 0) {
final long key = keys.set(i, 0);
final long newId = set(key, id - 1);
assert newId == id - 1 : newId + " " + (id - 1);
}
@Override
protected void resizeKeys(long capacity) {
keys = BigArrays.resize(keys, capacity);
}
@Override
protected void removeAndAdd(long index, long id) {
final long key = keys.set(index, 0);
reset(key, id);
}
@Override
public boolean release() {
boolean success = false;
try {
super.release();
success = true;
} finally {
Releasables.release(success, keys);
}
// The only entries which have not been put in their final position in the previous loop are those that were stored in a slot that
// is < slot(key, mask). This only happens when slot(key, mask) returned a slot that was close to the end of the array and colision
// resolution has put it back in the first slots. This time, collision resolution will have put them at the beginning of the newly
// allocated slots. Let's re-add them to make sure they are in the right slot. This 2nd loop will typically exit very early.
for (long i = buckets; i < newBuckets; ++i) {
final long id = ids.set(i, 0);
if (id > 0) {
--size; // we just removed an entry
final long key = keys.set(i, 0);
final long newId = set(key, id - 1); // add it back
assert newId == id - 1 : newId + " " + (id - 1);
assert newId == get(key);
} else {
break;
}
}
assert size == prevSize;
maxSize = (long) (newBuckets * maxLoadFactor);
assert size < maxSize;
return true;
}
}

View File

@ -21,18 +21,19 @@ package org.elasticsearch.search.aggregations.bucket.histogram;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.index.fielddata.LongValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.LongHash;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource;
import org.elasticsearch.search.aggregations.support.numeric.ValueFormatter;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import java.io.IOException;
import java.util.ArrayList;
@ -70,7 +71,7 @@ public class HistogramAggregator extends BucketsAggregator {
this.computeEmptyBuckets = computeEmptyBuckets;
this.histogramFactory = histogramFactory;
bucketOrds = new LongHash(initialCapacity);
bucketOrds = new LongHash(initialCapacity, aggregationContext.pageCacheRecycler());
}
@Override
@ -129,7 +130,10 @@ public class HistogramAggregator extends BucketsAggregator {
return histogramFactory.create(name, (List<HistogramBase.Bucket>) Collections.EMPTY_LIST, order, emptyBucketInfo, formatter, keyed);
}
@Override
public void doRelease() {
Releasables.release(bucketOrds);
}
public static class Factory extends ValueSourceAggregatorFactory<NumericValuesSource> {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.index.fielddata.DoubleValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -51,7 +52,7 @@ public class DoubleTermsAggregator extends BucketsAggregator {
this.order = order;
this.requiredSize = requiredSize;
this.shardSize = shardSize;
bucketOrds = new LongHash(estimatedBucketCount);
bucketOrds = new LongHash(estimatedBucketCount, aggregationContext.pageCacheRecycler());
}
@Override
@ -124,4 +125,9 @@ public class DoubleTermsAggregator extends BucketsAggregator {
return new DoubleTerms(name, order, valuesSource.formatter(), requiredSize, Collections.<InternalTerms.Bucket>emptyList());
}
@Override
public void doRelease() {
Releasables.release(bucketOrds);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.index.fielddata.LongValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -51,7 +52,7 @@ public class LongTermsAggregator extends BucketsAggregator {
this.order = order;
this.requiredSize = requiredSize;
this.shardSize = shardSize;
bucketOrds = new LongHash(estimatedBucketCount);
bucketOrds = new LongHash(estimatedBucketCount, aggregationContext.pageCacheRecycler());
}
@Override
@ -123,4 +124,9 @@ public class LongTermsAggregator extends BucketsAggregator {
return new LongTerms(name, order, valuesSource.formatter(), requiredSize, Collections.<InternalTerms.Bucket>emptyList());
}
@Override
public void doRelease() {
Releasables.release(bucketOrds);
}
}

View File

@ -21,7 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefHash;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
@ -30,6 +30,7 @@ import org.elasticsearch.index.fielddata.ordinals.Ordinals;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.BytesRefHash;
import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.support.AggregationContext;
@ -62,7 +63,7 @@ public class StringTermsAggregator extends BucketsAggregator {
this.requiredSize = requiredSize;
this.shardSize = shardSize;
this.includeExclude = includeExclude;
bucketOrds = new BytesRefHash();
bucketOrds = new BytesRefHash(estimatedBucketCount, aggregationContext.pageCacheRecycler());
}
@Override
@ -83,7 +84,7 @@ public class StringTermsAggregator extends BucketsAggregator {
}
final int hash = values.currentValueHash();
assert hash == bytes.hashCode();
int bucketOrdinal = bucketOrds.add(bytes, hash);
long bucketOrdinal = bucketOrds.add(bytes, hash);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = - 1 - bucketOrdinal;
}
@ -105,7 +106,7 @@ public class StringTermsAggregator extends BucketsAggregator {
@Override
public StringTerms buildAggregation(long owningBucketOrdinal) {
assert owningBucketOrdinal == 0;
final int size = Math.min(bucketOrds.size(), shardSize);
final int size = (int) Math.min(bucketOrds.size(), shardSize);
BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator());
OrdinalBucket spare = null;
@ -133,6 +134,11 @@ public class StringTermsAggregator extends BucketsAggregator {
return new StringTerms(name, order, requiredSize, Collections.<InternalTerms.Bucket>emptyList());
}
@Override
public void doRelease() {
Releasables.release(bucketOrds);
}
/**
* Extension of StringTermsAggregator that caches bucket ords using terms ordinals.
*/
@ -155,7 +161,10 @@ public class StringTermsAggregator extends BucketsAggregator {
ordinals = bytesValues.ordinals();
final long maxOrd = ordinals.getMaxOrd();
if (ordinalToBucket == null || ordinalToBucket.size() < maxOrd) {
ordinalToBucket = BigArrays.newLongArray(BigArrays.overSize(maxOrd));
if (ordinalToBucket != null) {
ordinalToBucket.release();
}
ordinalToBucket = BigArrays.newLongArray(BigArrays.overSize(maxOrd), context().pageCacheRecycler(), false);
}
ordinalToBucket.fill(0, maxOrd, -1L);
}
@ -182,6 +191,11 @@ public class StringTermsAggregator extends BucketsAggregator {
collectBucket(doc, bucketOrd);
}
}
@Override
public void doRelease() {
Releasables.release(bucketOrds, ordinalToBucket);
}
}
}

View File

@ -19,17 +19,18 @@
package org.elasticsearch.search.aggregations.metrics.avg;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.DoubleValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import java.io.IOException;
@ -49,8 +50,8 @@ public class AvgAggregator extends Aggregator {
this.valuesSource = valuesSource;
if (valuesSource != null) {
final long initialSize = estimatedBucketsCount < 2 ? 1 : estimatedBucketsCount;
counts = BigArrays.newLongArray(initialSize);
sums = BigArrays.newDoubleArray(initialSize);
counts = BigArrays.newLongArray(initialSize, context.pageCacheRecycler(), true);
sums = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), true);
}
}
@ -110,4 +111,9 @@ public class AvgAggregator extends Aggregator {
}
}
@Override
public void doRelease() {
Releasables.release(counts, sums);
}
}

View File

@ -19,16 +19,17 @@
package org.elasticsearch.search.aggregations.metrics.max;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.DoubleValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import java.io.IOException;
@ -46,7 +47,7 @@ public class MaxAggregator extends Aggregator {
this.valuesSource = valuesSource;
if (valuesSource != null) {
final long initialSize = estimatedBucketsCount < 2 ? 1 : estimatedBucketsCount;
maxes = BigArrays.newDoubleArray(initialSize);
maxes = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), false);
maxes.fill(0, maxes.size(), Double.NEGATIVE_INFINITY);
}
}
@ -109,4 +110,9 @@ public class MaxAggregator extends Aggregator {
return new MaxAggregator(name, expectedBucketsCount, valuesSource, aggregationContext, parent);
}
}
@Override
public void doRelease() {
Releasables.release(maxes);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.metrics.min;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.DoubleValues;
@ -46,7 +47,7 @@ public class MinAggregator extends Aggregator {
this.valuesSource = valuesSource;
if (valuesSource != null) {
final long initialSize = estimatedBucketsCount < 2 ? 1 : estimatedBucketsCount;
mins = BigArrays.newDoubleArray(initialSize);
mins = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), false);
mins.fill(0, mins.size(), Double.POSITIVE_INFINITY);
}
}
@ -104,4 +105,9 @@ public class MinAggregator extends Aggregator {
return new MinAggregator(name, expectedBucketsCount, valuesSource, aggregationContext, parent);
}
}
@Override
public void doRelease() {
Releasables.release(mins);
}
}

View File

@ -19,17 +19,18 @@
package org.elasticsearch.search.aggregations.metrics.stats;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.DoubleValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import java.io.IOException;
@ -50,11 +51,11 @@ public class StatsAggegator extends Aggregator {
this.valuesSource = valuesSource;
if (valuesSource != null) {
final long initialSize = estimatedBucketsCount < 2 ? 1 : estimatedBucketsCount;
counts = BigArrays.newLongArray(initialSize);
sums = BigArrays.newDoubleArray(initialSize);
mins = BigArrays.newDoubleArray(initialSize);
counts = BigArrays.newLongArray(initialSize, context.pageCacheRecycler(), true);
sums = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), true);
mins = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), false);
mins.fill(0, mins.size(), Double.POSITIVE_INFINITY);
maxes = BigArrays.newDoubleArray(initialSize);
maxes = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), false);
maxes.fill(0, maxes.size(), Double.NEGATIVE_INFINITY);
}
}
@ -130,4 +131,9 @@ public class StatsAggegator extends Aggregator {
return new StatsAggegator(name, expectedBucketsCount, valuesSource, aggregationContext, parent);
}
}
@Override
public void doRelease() {
Releasables.release(counts, maxes, mins, sums);
}
}

View File

@ -19,17 +19,18 @@
package org.elasticsearch.search.aggregations.metrics.stats.extended;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.DoubleValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import java.io.IOException;
@ -51,13 +52,13 @@ public class ExtendedStatsAggregator extends Aggregator {
this.valuesSource = valuesSource;
if (valuesSource != null) {
final long initialSize = estimatedBucketsCount < 2 ? 1 : estimatedBucketsCount;
counts = BigArrays.newLongArray(initialSize);
sums = BigArrays.newDoubleArray(initialSize);
mins = BigArrays.newDoubleArray(initialSize);
counts = BigArrays.newLongArray(initialSize, context.pageCacheRecycler(), true);
sums = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), true);
mins = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), false);
mins.fill(0, mins.size(), Double.POSITIVE_INFINITY);
maxes = BigArrays.newDoubleArray(initialSize);
maxes = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), false);
maxes.fill(0, maxes.size(), Double.NEGATIVE_INFINITY);
sumOfSqrs = BigArrays.newDoubleArray(initialSize);
sumOfSqrs = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), true);
}
}
@ -121,6 +122,11 @@ public class ExtendedStatsAggregator extends Aggregator {
return new InternalExtendedStats(name, 0, 0d, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, 0d);
}
@Override
public void doRelease() {
Releasables.release(counts, maxes, mins, sumOfSqrs, sums);
}
public static class Factory extends ValueSourceAggregatorFactory.LeafOnly<NumericValuesSource> {
public Factory(String name, ValuesSourceConfig<NumericValuesSource> valuesSourceConfig) {

View File

@ -19,16 +19,17 @@
package org.elasticsearch.search.aggregations.metrics.sum;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.DoubleValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.numeric.NumericValuesSource;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import java.io.IOException;
@ -46,7 +47,7 @@ public class SumAggregator extends Aggregator {
this.valuesSource = valuesSource;
if (valuesSource != null) {
final long initialSize = estimatedBucketsCount < 2 ? 1 : estimatedBucketsCount;
sums = BigArrays.newDoubleArray(initialSize);
sums = BigArrays.newDoubleArray(initialSize, context.pageCacheRecycler(), true);
}
}
@ -103,4 +104,9 @@ public class SumAggregator extends Aggregator {
return new SumAggregator(name, expectedBucketsCount, valuesSource, aggregationContext, parent);
}
}
@Override
public void doRelease() {
Releasables.release(sums);
}
}

View File

@ -19,16 +19,17 @@
package org.elasticsearch.search.aggregations.metrics.valuecount;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.BytesValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.bytes.BytesValuesSource;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.support.ValueSourceAggregatorFactory;
import java.io.IOException;
@ -51,7 +52,7 @@ public class ValueCountAggregator extends Aggregator {
if (valuesSource != null) {
// expectedBucketsCount == 0 means it's a top level bucket
final long initialSize = expectedBucketsCount < 2 ? 1 : expectedBucketsCount;
counts = BigArrays.newLongArray(initialSize);
counts = BigArrays.newLongArray(initialSize, context.pageCacheRecycler(), true);
}
}
@ -84,6 +85,11 @@ public class ValueCountAggregator extends Aggregator {
return new InternalValueCount(name, 0l);
}
@Override
public void doRelease() {
Releasables.release(counts);
}
public static class Factory extends ValueSourceAggregatorFactory.LeafOnly<BytesValuesSource> {
public Factory(String name, ValuesSourceConfig<BytesValuesSource> valuesSourceBuilder) {

View File

@ -25,6 +25,7 @@ import org.apache.lucene.search.Scorer;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.lucene.ReaderContextAware;
import org.elasticsearch.common.lucene.ScorerAware;
import org.elasticsearch.index.fielddata.IndexFieldData;
@ -67,6 +68,10 @@ public class AggregationContext implements ReaderContextAware, ScorerAware {
return searchContext.cacheRecycler();
}
public PageCacheRecycler pageCacheRecycler() {
return searchContext.pageCacheRecycler();
}
public AtomicReaderContext currentReader() {
return reader;
}

View File

@ -27,6 +27,7 @@ import org.apache.lucene.search.Sort;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.search.AndFilter;
@ -89,6 +90,8 @@ public class DefaultSearchContext extends SearchContext {
private final CacheRecycler cacheRecycler;
private final PageCacheRecycler pageCacheRecycler;
private final IndexShard indexShard;
private final IndexService indexService;
@ -170,7 +173,7 @@ public class DefaultSearchContext extends SearchContext {
public DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, IndexService indexService, IndexShard indexShard,
ScriptService scriptService, CacheRecycler cacheRecycler) {
ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler) {
this.id = id;
this.request = request;
this.searchType = request.searchType();
@ -178,6 +181,7 @@ public class DefaultSearchContext extends SearchContext {
this.engineSearcher = engineSearcher;
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
this.dfsResult = new DfsSearchResult(id, shardTarget);
this.queryResult = new QuerySearchResult(id, shardTarget);
this.fetchResult = new FetchSearchResult(id, shardTarget);
@ -420,6 +424,10 @@ public class DefaultSearchContext extends SearchContext {
return cacheRecycler;
}
public PageCacheRecycler pageCacheRecycler() {
return pageCacheRecycler;
}
public FilterCache filterCache() {
return indexService.cache().filter();
}

View File

@ -23,6 +23,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.analysis.AnalysisService;
@ -175,6 +176,8 @@ public abstract class SearchContext implements Releasable {
public abstract CacheRecycler cacheRecycler();
public abstract PageCacheRecycler pageCacheRecycler();
public abstract FilterCache filterCache();
public abstract DocSetCache docSetCache();

View File

@ -0,0 +1,83 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cache.recycler;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.recycler.Recycler.V;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Random;
public class MockPageCacheRecycler extends PageCacheRecycler {
private final Random random;
@Inject
public MockPageCacheRecycler(Settings settings, ThreadPool threadPool) {
super(settings, threadPool);
final long seed = settings.getAsLong(TestCluster.SETTING_CLUSTER_NODE_SEED, 0L);
random = new Random(seed);
}
@Override
public V<byte[]> bytePage(boolean clear) {
final V<byte[]> page = super.bytePage(clear);
if (!clear) {
random.nextBytes(page.v());
}
return page;
}
@Override
public V<int[]> intPage(boolean clear) {
final V<int[]> page = super.intPage(clear);
if (!clear) {
for (int i = 0; i < page.v().length; ++i) {
page.v()[i] = random.nextInt();
}
}
return page;
}
@Override
public V<long[]> longPage(boolean clear) {
final V<long[]> page = super.longPage(clear);
if (!clear) {
for (int i = 0; i < page.v().length; ++i) {
page.v()[i] = random.nextLong();
}
}
return page;
}
@Override
public V<double[]> doublePage(boolean clear) {
final V<double[]> page = super.doublePage(clear);
if (!clear) {
for (int i = 0; i < page.v().length; ++i) {
page.v()[i] = random.nextDouble() - 0.5;
}
}
return page;
}
}

View File

@ -45,14 +45,18 @@ public abstract class AbstractRecyclerTests extends ElasticsearchTestCase {
public void testReuse() {
Recycler<byte[]> r = newRecycler();
Recycler.V<byte[]> o = r.obtain();
assertFalse(o.isRecycled());
final byte[] b1 = o.v();
o.release();
o = r.obtain();
final byte[] b2 = o.v();
if (o.isRecycled()) {
assertSame(b1, b2);
} else {
assertNotSame(b1, b2);
}
o.release();
r.close();
}
public void testClear() {
@ -65,6 +69,7 @@ public abstract class AbstractRecyclerTests extends ElasticsearchTestCase {
assertEquals(0, o.v()[i]);
}
o.release();
r.close();
}
public void testDoubleRelease() {
@ -81,6 +86,7 @@ public abstract class AbstractRecyclerTests extends ElasticsearchTestCase {
final Recycler.V<byte[]> v2 = r.obtain();
final Recycler.V<byte[]> v3 = r.obtain();
assertNotSame(v2.v(), v3.v());
r.close();
}
}

View File

@ -23,7 +23,7 @@ public class QueueRecyclerTests extends AbstractRecyclerTests {
@Override
protected Recycler<byte[]> newRecycler() {
return new QueueRecycler<byte[]>(RECYCLER_C);
return new QueueRecycler<byte[]>(RECYCLER_C, randomIntBetween(5, 10));
}
}

View File

@ -19,16 +19,41 @@
package org.elasticsearch.common.util;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cache.recycler.MockPageCacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
public class BigArraysTests extends ElasticsearchTestCase {
public static PageCacheRecycler randomCacheRecycler() {
return randomBoolean() ? null : new MockPageCacheRecycler(ImmutableSettings.EMPTY, new ThreadPool());
}
public void testByteArrayGrowth() {
final int totalLen = randomIntBetween(1, 4000000);
final int startLen = randomIntBetween(1, randomBoolean() ? 1000 : totalLen);
ByteArray array = BigArrays.newByteArray(startLen, randomCacheRecycler(), randomBoolean());
byte[] ref = new byte[totalLen];
for (int i = 0; i < totalLen; ++i) {
ref[i] = randomByte();
array = BigArrays.grow(array, i + 1);
array.set(i, ref[i]);
}
for (int i = 0; i < totalLen; ++i) {
assertEquals(ref[i], array.get(i));
}
array.release();
}
public void testIntArrayGrowth() {
final int totalLen = randomIntBetween(1, 1000000);
final int startLen = randomIntBetween(1, randomBoolean() ? 1000 : totalLen);
IntArray array = BigArrays.newIntArray(startLen);
IntArray array = BigArrays.newIntArray(startLen, randomCacheRecycler(), randomBoolean());
int[] ref = new int[totalLen];
for (int i = 0; i < totalLen; ++i) {
ref[i] = randomInt();
@ -38,12 +63,13 @@ public class BigArraysTests extends ElasticsearchTestCase {
for (int i = 0; i < totalLen; ++i) {
assertEquals(ref[i], array.get(i));
}
array.release();
}
public void testLongArrayGrowth() {
final int totalLen = randomIntBetween(1, 1000000);
final int startLen = randomIntBetween(1, randomBoolean() ? 1000 : totalLen);
LongArray array = BigArrays.newLongArray(startLen);
LongArray array = BigArrays.newLongArray(startLen, randomCacheRecycler(), randomBoolean());
long[] ref = new long[totalLen];
for (int i = 0; i < totalLen; ++i) {
ref[i] = randomLong();
@ -53,12 +79,13 @@ public class BigArraysTests extends ElasticsearchTestCase {
for (int i = 0; i < totalLen; ++i) {
assertEquals(ref[i], array.get(i));
}
array.release();
}
public void testDoubleArrayGrowth() {
final int totalLen = randomIntBetween(1, 1000000);
final int startLen = randomIntBetween(1, randomBoolean() ? 1000 : totalLen);
DoubleArray array = BigArrays.newDoubleArray(startLen);
DoubleArray array = BigArrays.newDoubleArray(startLen, randomCacheRecycler(), randomBoolean());
double[] ref = new double[totalLen];
for (int i = 0; i < totalLen; ++i) {
ref[i] = randomDouble();
@ -68,12 +95,13 @@ public class BigArraysTests extends ElasticsearchTestCase {
for (int i = 0; i < totalLen; ++i) {
assertEquals(ref[i], array.get(i), 0.001d);
}
array.release();
}
public void testObjectArrayGrowth() {
final int totalLen = randomIntBetween(1, 1000000);
final int startLen = randomIntBetween(1, randomBoolean() ? 1000 : totalLen);
ObjectArray<Object> array = BigArrays.newObjectArray(startLen);
ObjectArray<Object> array = BigArrays.newObjectArray(startLen, randomCacheRecycler());
final Object[] pool = new Object[100];
for (int i = 0; i < pool.length; ++i) {
pool[i] = new Object();
@ -87,6 +115,7 @@ public class BigArraysTests extends ElasticsearchTestCase {
for (int i = 0; i < totalLen; ++i) {
assertSame(ref[i], array.get(i));
}
array.release();
}
public void testDoubleArrayFill() {
@ -95,7 +124,7 @@ public class BigArraysTests extends ElasticsearchTestCase {
final int toIndex = randomBoolean()
? Math.min(fromIndex + randomInt(100), len) // single page
: randomIntBetween(fromIndex, len); // likely multiple pages
final DoubleArray array2 = BigArrays.newDoubleArray(len);
final DoubleArray array2 = BigArrays.newDoubleArray(len, randomCacheRecycler(), randomBoolean());
final double[] array1 = new double[len];
for (int i = 0; i < len; ++i) {
array1[i] = randomDouble();
@ -107,6 +136,7 @@ public class BigArraysTests extends ElasticsearchTestCase {
for (int i = 0; i < len; ++i) {
assertEquals(array1[i], array2.get(i), 0.001d);
}
array2.release();
}
public void testLongArrayFill() {
@ -115,7 +145,7 @@ public class BigArraysTests extends ElasticsearchTestCase {
final int toIndex = randomBoolean()
? Math.min(fromIndex + randomInt(100), len) // single page
: randomIntBetween(fromIndex, len); // likely multiple pages
final LongArray array2 = BigArrays.newLongArray(len);
final LongArray array2 = BigArrays.newLongArray(len, randomCacheRecycler(), randomBoolean());
final long[] array1 = new long[len];
for (int i = 0; i < len; ++i) {
array1[i] = randomLong();
@ -127,6 +157,39 @@ public class BigArraysTests extends ElasticsearchTestCase {
for (int i = 0; i < len; ++i) {
assertEquals(array1[i], array2.get(i));
}
array2.release();
}
public void testByteArrayBulkGet() {
final byte[] array1 = new byte[randomIntBetween(1, 4000000)];
getRandom().nextBytes(array1);
final ByteArray array2 = BigArrays.newByteArray(array1.length, randomCacheRecycler(), randomBoolean());
for (int i = 0; i < array1.length; ++i) {
array2.set(i, array1[i]);
}
final BytesRef ref = new BytesRef();
for (int i = 0; i < 1000; ++i) {
final int offset = randomInt(array1.length - 1);
final int len = randomInt(Math.min(randomBoolean() ? 10 : Integer.MAX_VALUE, array1.length - offset));
array2.get(offset, len, ref);
assertEquals(new BytesRef(array1, offset, len), ref);
}
array2.release();
}
public void testByteArrayBulkSet() {
final byte[] array1 = new byte[randomIntBetween(1, 4000000)];
getRandom().nextBytes(array1);
final ByteArray array2 = BigArrays.newByteArray(array1.length, randomCacheRecycler(), randomBoolean());
for (int i = 0; i < array1.length; ) {
final int len = Math.min(array1.length - i, randomBoolean() ? randomInt(10) : randomInt(3 * BigArrays.BYTE_PAGE_SIZE));
array2.set(i, array1, i, len);
i += len;
}
for (int i = 0; i < array1.length; ++i) {
assertEquals(array1[i], array2.get(i));
}
array2.release();
}
}

View File

@ -30,6 +30,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.lucene.search.NotFilter;
import org.elasticsearch.common.lucene.search.XConstantScoreQuery;
@ -326,6 +327,7 @@ public class ChildrenConstantScoreQueryTests extends ElasticsearchLuceneTestCase
final Index index = new Index(indexName);
final IdCache idCache = new SimpleIdCache(index, ImmutableSettings.EMPTY);
final CacheRecycler cacheRecycler = new CacheRecycler(ImmutableSettings.EMPTY);
final PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(ImmutableSettings.EMPTY, new ThreadPool());
Settings settings = ImmutableSettings.EMPTY;
MapperService mapperService = MapperTestUtils.newMapperService(index, settings);
final IndexService indexService = new SimpleIdCacheTests.StubIndexService(mapperService);
@ -339,7 +341,7 @@ public class ChildrenConstantScoreQueryTests extends ElasticsearchLuceneTestCase
NodeSettingsService nodeSettingsService = new NodeSettingsService(settings);
IndicesFilterCache indicesFilterCache = new IndicesFilterCache(settings, threadPool, cacheRecycler, nodeSettingsService);
WeightedFilterCache filterCache = new WeightedFilterCache(index, settings, indicesFilterCache);
return new TestSearchContext(cacheRecycler, idCache, indexService, filterCache);
return new TestSearchContext(cacheRecycler, pageCacheRecycler, idCache, indexService, filterCache);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.lucene.search.Sort;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.docset.DocSetCache;
@ -64,6 +65,7 @@ import java.util.List;
class TestSearchContext extends SearchContext {
final CacheRecycler cacheRecycler;
final PageCacheRecycler pageCacheRecycler;
final IdCache idCache;
final IndexService indexService;
final FilterCache filterCache;
@ -71,8 +73,9 @@ class TestSearchContext extends SearchContext {
ContextIndexSearcher searcher;
int size;
TestSearchContext(CacheRecycler cacheRecycler, IdCache idCache, IndexService indexService, FilterCache filterCache) {
TestSearchContext(CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, IdCache idCache, IndexService indexService, FilterCache filterCache) {
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
this.idCache = idCache;
this.indexService = indexService;
this.filterCache = filterCache;
@ -293,6 +296,11 @@ class TestSearchContext extends SearchContext {
return cacheRecycler;
}
@Override
public PageCacheRecycler pageCacheRecycler() {
return pageCacheRecycler;
}
@Override
public FilterCache filterCache() {
return filterCache;

View File

@ -0,0 +1,246 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.ObjectLongOpenHashMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util._TestUtil;
import org.elasticsearch.common.util.BigArraysTests;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.*;
import java.util.Map.Entry;
public class BytesRefHashTests extends ElasticsearchTestCase {
BytesRefHash hash;
private void newHash() {
// Test high load factors to make sure that collision resolution works fine
final float maxLoadFactor = 0.6f + randomFloat() * 0.39f;
hash = new BytesRefHash(randomIntBetween(0, 100), maxLoadFactor, BigArraysTests.randomCacheRecycler());
}
@Override
public void setUp() throws Exception {
super.setUp();
newHash();
}
public void testDuell() {
final BytesRef[] values = new BytesRef[randomIntBetween(1, 100000)];
for (int i = 0; i < values.length; ++i) {
values[i] = new BytesRef(randomAsciiOfLength(5));
}
final ObjectLongMap<BytesRef> valueToId = new ObjectLongOpenHashMap<BytesRef>();
final BytesRef[] idToValue = new BytesRef[values.length];
final int iters = randomInt(1000000);
for (int i = 0; i < iters; ++i) {
final BytesRef value = randomFrom(values);
if (valueToId.containsKey(value)) {
assertEquals(- 1 - valueToId.get(value), hash.add(value, value.hashCode()));
} else {
assertEquals(valueToId.size(), hash.add(value, value.hashCode()));
idToValue[valueToId.size()] = value;
valueToId.put(value, valueToId.size());
}
}
assertEquals(valueToId.size(), hash.size());
for (Iterator<ObjectLongCursor<BytesRef>> iterator = valueToId.iterator(); iterator.hasNext(); ) {
final ObjectLongCursor<BytesRef> next = iterator.next();
assertEquals(next.value, hash.find(next.key, next.key.hashCode()));
}
for (long i = 0; i < hash.capacity(); ++i) {
final long id = hash.id(i);
BytesRef spare = new BytesRef();
if (id >= 0) {
hash.get(id, spare);
assertEquals(idToValue[(int) id], spare);
}
}
hash.release();
}
// START - tests borrowed from LUCENE
/**
* Test method for {@link org.apache.lucene.util.BytesRefHash#size()}.
*/
@Test
public void testSize() {
BytesRef ref = new BytesRef();
int num = atLeast(2);
for (int j = 0; j < num; j++) {
final int mod = 1+randomInt(40);
for (int i = 0; i < 797; i++) {
String str;
do {
str = _TestUtil.randomRealisticUnicodeString(getRandom(), 1000);
} while (str.length() == 0);
ref.copyChars(str);
long count = hash.size();
long key = hash.add(ref);
if (key < 0)
assertEquals(hash.size(), count);
else
assertEquals(hash.size(), count + 1);
if(i % mod == 0) {
newHash();
}
}
}
}
/**
* Test method for
* {@link org.apache.lucene.util.BytesRefHash#get(int, BytesRef)}
* .
*/
@Test
public void testGet() {
BytesRef ref = new BytesRef();
BytesRef scratch = new BytesRef();
int num = atLeast(2);
for (int j = 0; j < num; j++) {
Map<String, Long> strings = new HashMap<String, Long>();
int uniqueCount = 0;
for (int i = 0; i < 797; i++) {
String str;
do {
str = _TestUtil.randomRealisticUnicodeString(getRandom(), 1000);
} while (str.length() == 0);
ref.copyChars(str);
long count = hash.size();
long key = hash.add(ref);
if (key >= 0) {
assertNull(strings.put(str, Long.valueOf(key)));
assertEquals(uniqueCount, key);
uniqueCount++;
assertEquals(hash.size(), count + 1);
} else {
assertTrue((-key)-1 < count);
assertEquals(hash.size(), count);
}
}
for (Entry<String, Long> entry : strings.entrySet()) {
ref.copyChars(entry.getKey());
assertEquals(ref, hash.get(entry.getValue().longValue(), scratch));
}
newHash();
}
}
/**
* Test method for
* {@link org.apache.lucene.util.BytesRefHash#add(org.apache.lucene.util.BytesRef)}
* .
*/
@Test
public void testAdd() {
BytesRef ref = new BytesRef();
BytesRef scratch = new BytesRef();
int num = atLeast(2);
for (int j = 0; j < num; j++) {
Set<String> strings = new HashSet<String>();
int uniqueCount = 0;
for (int i = 0; i < 797; i++) {
String str;
do {
str = _TestUtil.randomRealisticUnicodeString(getRandom(), 1000);
} while (str.length() == 0);
ref.copyChars(str);
long count = hash.size();
long key = hash.add(ref);
if (key >=0) {
assertTrue(strings.add(str));
assertEquals(uniqueCount, key);
assertEquals(hash.size(), count + 1);
uniqueCount++;
} else {
assertFalse(strings.add(str));
assertTrue((-key)-1 < count);
assertEquals(str, hash.get((-key)-1, scratch).utf8ToString());
assertEquals(count, hash.size());
}
}
assertAllIn(strings, hash);
newHash();
}
}
@Test
public void testFind() throws Exception {
BytesRef ref = new BytesRef();
BytesRef scratch = new BytesRef();
int num = atLeast(2);
for (int j = 0; j < num; j++) {
Set<String> strings = new HashSet<String>();
int uniqueCount = 0;
for (int i = 0; i < 797; i++) {
String str;
do {
str = _TestUtil.randomRealisticUnicodeString(getRandom(), 1000);
} while (str.length() == 0);
ref.copyChars(str);
long count = hash.size();
long key = hash.find(ref); //hash.add(ref);
if (key >= 0) { // string found in hash
assertFalse(strings.add(str));
assertTrue(key < count);
assertEquals(str, hash.get(key, scratch).utf8ToString());
assertEquals(count, hash.size());
} else {
key = hash.add(ref);
assertTrue(strings.add(str));
assertEquals(uniqueCount, key);
assertEquals(hash.size(), count + 1);
uniqueCount++;
}
}
assertAllIn(strings, hash);
newHash();
}
}
private void assertAllIn(Set<String> strings, BytesRefHash hash) {
BytesRef ref = new BytesRef();
BytesRef scratch = new BytesRef();
long count = hash.size();
for (String string : strings) {
ref.copyChars(string);
long key = hash.add(ref); // add again to check duplicates
assertEquals(string, hash.get((-key)-1, scratch).utf8ToString());
assertEquals(count, hash.size());
assertTrue("key: " + key + " count: " + count + " string: " + string,
key < count);
}
}
// END - tests borrowed from LUCENE
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.search.aggregations.bucket;
import com.carrotsearch.hppc.LongLongMap;
import com.carrotsearch.hppc.LongLongOpenHashMap;
import com.carrotsearch.hppc.cursors.LongLongCursor;
import org.elasticsearch.search.aggregations.bucket.LongHash;
import org.elasticsearch.common.util.BigArraysTests;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.util.Iterator;
@ -38,7 +38,7 @@ public class LongHashTests extends ElasticsearchTestCase {
final long[] idToValue = new long[values.length];
// Test high load factors to make sure that collision resolution works fine
final float maxLoadFactor = 0.6f + randomFloat() * 0.39f;
final LongHash longHash = new LongHash(randomIntBetween(0, 100), maxLoadFactor);
final LongHash longHash = new LongHash(randomIntBetween(0, 100), maxLoadFactor, BigArraysTests.randomCacheRecycler());
final int iters = randomInt(1000000);
for (int i = 0; i < iters; ++i) {
final Long value = randomFrom(values);
@ -54,7 +54,7 @@ public class LongHashTests extends ElasticsearchTestCase {
assertEquals(valueToId.size(), longHash.size());
for (Iterator<LongLongCursor> iterator = valueToId.iterator(); iterator.hasNext(); ) {
final LongLongCursor next = iterator.next();
assertEquals(next.value, longHash.get(next.key));
assertEquals(next.value, longHash.find(next.key));
}
for (long i = 0; i < longHash.capacity(); ++i) {
@ -63,6 +63,7 @@ public class LongHashTests extends ElasticsearchTestCase {
assertEquals(idToValue[(int) id], longHash.key(i));
}
}
longHash.release();
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecyclerModule;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.client.transport.TransportClient;
@ -48,9 +49,11 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.engine.IndexEngineModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
import org.elasticsearch.test.engine.MockEngineModule;
import org.elasticsearch.test.store.MockFSIndexStoreModule;
import org.elasticsearch.test.transport.AssertingLocalTransportModule;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
@ -216,6 +219,7 @@ public final class TestCluster implements Iterable<Client> {
if (ENABLE_MOCK_MODULES && usually(random)) {
builder.put("index.store.type", MockFSIndexStoreModule.class.getName()); // no RAM dir for now!
builder.put(IndexEngineModule.EngineSettings.ENGINE_TYPE, MockEngineModule.class.getName());
builder.put(PageCacheRecyclerModule.CACHE_IMPL, MockPageCacheRecyclerModule.class.getName());
}
if (isLocalTransportConfigured()) {
builder.put(TransportModule.TRANSPORT_TYPE_KEY, AssertingLocalTransportModule.class.getName());
@ -223,6 +227,21 @@ public final class TestCluster implements Iterable<Client> {
builder.put(Transport.TransportSettings.TRANSPORT_TCP_COMPRESS, rarely(random));
}
builder.put("type", RandomPicks.randomFrom(random, CacheRecycler.Type.values()));
if (random.nextBoolean()) {
builder.put("cache.recycler.page.type", RandomPicks.randomFrom(random, CacheRecycler.Type.values()));
}
if (random.nextBoolean()) {
// change threadpool types to make sure we don't have components that rely on the type of thread pools
for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GENERIC, ThreadPool.Names.GET,
ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.MERGE, ThreadPool.Names.OPTIMIZE,
ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT,
ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) {
if (random.nextBoolean()) {
final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling"));
builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type);
}
}
}
return builder.build();
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.cache.recycler;
import org.elasticsearch.cache.recycler.MockPageCacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.inject.AbstractModule;
public class MockPageCacheRecyclerModule extends AbstractModule {
@Override
protected void configure() {
bind(PageCacheRecycler.class).to(MockPageCacheRecycler.class).asEagerSingleton();
}
}