remove dead code; get one test working again; fix docs; remove nocommits

This commit is contained in:
Michael McCandless 2015-12-16 16:19:07 -05:00 committed by mikemccand
parent ed5c0e7f13
commit 319dc8c8ed
12 changed files with 98 additions and 133 deletions

View File

@ -125,18 +125,6 @@ final class CompositeIndexEventListener implements IndexEventListener {
}
}
@Override
public void onShardActive(IndexShard indexShard) {
for (IndexEventListener listener : listeners) {
try {
listener.onShardActive(indexShard);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke on shard active callback", t, indexShard.shardId().getId());
throw t;
}
}
}
@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {
for (IndexEventListener listener : listeners) {

View File

@ -19,17 +19,6 @@
package org.elasticsearch.index.engine;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
@ -61,6 +50,17 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
/**
*
*/

View File

@ -89,11 +89,6 @@ public final class EngineConfig {
*/
public static final String INDEX_CODEC_SETTING = "index.codec";
/**
* Index setting to control the index buffer size.
* This setting is <b>not</b> realtime updateable.
*/
/** if set to true the engine will start even if the translog id in the commit point can not be found */
public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";
@ -128,7 +123,8 @@ public final class EngineConfig {
this.eventListener = eventListener;
this.compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = settings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
// We give IndexWriter a huge buffer, so it won't flush on its own. Instead, IndexingMemoryController periodically checks
// We give IndexWriter a "huge" (256 MB) buffer, so it won't flush on its own unless the ES indexing buffer is also huge and/or
// there are not too many shards allocated to this node. Instead, IndexingMemoryController periodically checks
// and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high:
indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();

View File

@ -103,6 +103,11 @@ public class InternalEngine extends Engine {
private final IndexThrottle throttle;
// How many callers are currently requesting index throttling. Currently there are only two times we do this: when merges are falling
// behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling incoming
// indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
public InternalEngine(EngineConfig engineConfig, boolean skipInitialTranslogRecovery) throws EngineException {
super(engineConfig);
this.versionMap = new LiveVersionMap();
@ -516,12 +521,11 @@ public class InternalEngine extends Engine {
long versionMapBytes = versionMap.ramBytesUsedForRefresh();
long indexingBufferBytes = indexWriter.ramBytesUsed();
boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes);
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes/4 < versionMapBytes);
if (useRefresh) {
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
@ -542,15 +546,6 @@ public class InternalEngine extends Engine {
failEngine("writeIndexingBuffer failed", t);
throw new RefreshFailedEngineException(shardId, t);
}
// TODO: maybe we should just put a scheduled job in threadPool?
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
// for a long time:
if (useRefresh) {
maybePruneDeletedTombstones();
versionMapRefreshPending.set(false);
mergeScheduler.refreshConfig();
}
}
@Override
@ -1051,8 +1046,6 @@ public class InternalEngine extends Engine {
}
}
private final AtomicInteger throttleRequestCount = new AtomicInteger();
@Override
public void activateThrottling() {
int count = throttleRequestCount.incrementAndGet();

View File

@ -70,7 +70,6 @@ public interface IndexEventListener {
*/
default void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {}
/**
* Called after a shard's {@link org.elasticsearch.index.shard.IndexShardState} changes.
* The order of concurrent events is preserved. The execution must be lightweight.
@ -89,13 +88,6 @@ public interface IndexEventListener {
*/
default void onShardInactive(IndexShard indexShard) {}
/**
* Called when a shard is marked as active ie. was previously inactive and is now active again.
*
* @param indexShard The shard that was marked active
*/
default void onShardActive(IndexShard indexShard) {}
/**
* Called before the index gets created. Note that this is also called
* when the index is created on data nodes

View File

@ -488,7 +488,10 @@ public class IndexShard extends AbstractIndexShardComponent {
throw ex;
}
indexingService.postIndex(index);
// Notify IMC so that it can go and check heap used by all indexing buffers periodically:
indexingMemoryController.bytesWritten(index.getTranslogLocation().size);
return created;
}
@ -525,6 +528,8 @@ public class IndexShard extends AbstractIndexShardComponent {
throw ex;
}
indexingService.postDelete(delete);
// Notify IMC so that it can go and check heap used by all indexing buffers periodically:
indexingMemoryController.bytesWritten(delete.getTranslogLocation().size);
}
@ -533,13 +538,13 @@ public class IndexShard extends AbstractIndexShardComponent {
return getEngine().get(get, this::acquireSearcher);
}
/** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */
public void refresh(String source) {
verifyNotClosed();
// nocommit OK to throw EngineClosedExc?
long ramBytesUsed = getEngine().indexBufferRAMBytesUsed();
indexingMemoryController.addWritingBytes(this, ramBytesUsed);
try {
logger.debug("refresh with source: {} indexBufferRAMBytesUsed={}", source, ramBytesUsed);
logger.debug("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(ramBytesUsed));
long time = System.nanoTime();
getEngine().refresh(source);
refreshMetric.inc(System.nanoTime() - time);
@ -1019,14 +1024,6 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
/**
* Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
* IndexingMemoryController#SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}.
*/
public boolean getActive() {
return active.get();
}
public final boolean isFlushOnClose() {
return flushOnClose;
}
@ -1226,7 +1223,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private void handleRefreshException(Exception e) {
if (e instanceof EngineClosedException) {
// ignore
} else if (e instanceof RefreshFailedEngineException e) {
} else if (e instanceof RefreshFailedEngineException) {
RefreshFailedEngineException rfee = (RefreshFailedEngineException) e;
if (rfee.getCause() instanceof InterruptedException) {
// ignore, we are being shutdown

View File

@ -119,7 +119,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
SearchService searchService, SyncedFlushService syncedFlushService,
RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider, IndexingMemoryController indexingMemoryController) {
super(settings);
this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService, indexingMemoryController);
this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;

View File

@ -34,15 +34,13 @@ import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
// nocommit what is IndexEventListener
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> implements IndexEventListener {
public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> {
/** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size";
@ -136,9 +134,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
/** Shard calls when it's done writing these bytes to disk */
public void removeWritingBytes(IndexShard shard, long numBytes) {
// nocommit this can fail, if two refreshes are running "concurrently"
Long result = writingBytes.remove(shard);
//assert result != null;
writingBytes.remove(shard);
logger.debug("IMC: clear writing bytes for {}", shard.shardId());
}
@ -200,8 +196,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
statusChecker.run();
}
long startMS = System.currentTimeMillis();
/** called by IndexShard to record that this many bytes were written to translog */
public void bytesWritten(int bytes) {
statusChecker.bytesWritten(bytes);

View File

@ -105,7 +105,6 @@ public class IndexModuleTests extends ESTestCase {
scriptEngines.addAll(Arrays.asList(scriptEngineServices));
ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), new ScriptContextRegistry(Collections.emptyList()));
IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, Collections.emptySet(), new NamedWriteableRegistry());
// nocommit null:
IndexingMemoryController indexingMemoryController = new IndexingMemoryController(settings, threadPool, null);
return new NodeServicesProvider(threadPool, indicesQueryCache, null, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, indicesFieldDataCache, circuitBreakerService, indexingMemoryController);
}

View File

@ -1563,58 +1563,6 @@ public class InternalEngineTests extends ESTestCase {
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
}
// #10312
// ncommit get this working again
/*
public void testDeletesAloneCanTriggerRefresh() throws Exception {
// nocommit need to set buffer up front again?
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
engine.config().setIndexingBufferSize(new ByteSizeValue(1, ByteSizeUnit.KB));
for (int i = 0; i < 100; i++) {
String id = Integer.toString(i);
ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocument(), B_1, null);
engine.index(new Engine.Index(newUid(id), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
}
// Force merge so we know all merges are done before we start deleting:
engine.forceMerge(true, 1, false, false, false);
// Make a shell of an IMC to check up on indexing buffer usage:
IndexingMemoryController imc = new IndexingMemoryController(settings, threadPool, null) {
@Override
protected List<IndexShard> availableShards() {
return Collections.singletonList(new ShardId("foo", 0));
}
@Override
protected void refreshShardAsync(IndexShard shard) {
engine.refresh("memory");
}
@Override
protected long getIndexBufferRAMBytesUsed(ShardId shardId) {
System.out.println("BYTES USED: " + engine.indexBufferRAMBytesUsed());
return engine.indexBufferRAMBytesUsed();
}
};
Searcher s = engine.acquireSearcher("test");
final long version1 = ((DirectoryReader) s.reader()).getVersion();
s.close();
for (int i = 0; i < 100; i++) {
String id = Integer.toString(i);
engine.delete(new Engine.Delete("test", id, newUid(id), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
}
imc.forceCheck();
try (Searcher s2 = engine.acquireSearcher("test")) {
assertThat(((DirectoryReader) s2.reader()).getVersion(), greaterThan(version1));
}
}
}
*/
public void testMissingTranslog() throws IOException {
// test that we can force start the engine , even if the translog is missing.
engine.close();

View File

@ -18,19 +18,23 @@
*/
package org.elasticsearch.indices.memory;
import java.util.*;
import org.apache.lucene.index.DirectoryReader;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
@ -183,4 +187,65 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
assertThat(controller.indexingBufferSize(), equalTo(new ByteSizeValue(6, ByteSizeUnit.MB)));
}
// #10312
public void testDeletesAloneCanTriggerRefresh() throws Exception {
createIndex("index",
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
.build());
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService("index");
IndexShard shard = indexService.getShardOrNull(0);
assertNotNull(shard);
for (int i = 0; i < 100; i++) {
String id = Integer.toString(i);
client().prepareIndex("index", "type", id).setSource("field", "value").get();
}
// Force merge so we know all merges are done before we start deleting:
ForceMergeResponse r = client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
assertNoFailures(r);
// Make a shell of an IMC to check up on indexing buffer usage:
Settings settings = Settings.builder().put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "1kb").build();
// TODO: would be cleaner if I could pass this 1kb setting to the single node this test created....
IndexingMemoryController imc = new IndexingMemoryController(settings, null, null) {
@Override
protected List<IndexShard> availableShards() {
return Collections.singletonList(shard);
}
@Override
protected long getIndexBufferRAMBytesUsed(IndexShard shard) {
return shard.getIndexBufferRAMBytesUsed();
}
};
for (int i = 0; i < 100; i++) {
String id = Integer.toString(i);
client().prepareDelete("index", "type", id).get();
}
final long indexingBufferBytes1 = shard.getIndexBufferRAMBytesUsed();
imc.forceCheck();
// We must assertBusy because the writeIndexingBufferAsync is done in background (REFRESH) thread pool:
assertBusy(new Runnable() {
@Override
public void run() {
try (Engine.Searcher s2 = shard.acquireSearcher("index")) {
// 100 buffered deletes will easily exceed our 1 KB indexing buffer so it should trigger a write:
final long indexingBufferBytes2 = shard.getIndexBufferRAMBytesUsed();
assertTrue(indexingBufferBytes2 < indexingBufferBytes1);
}
}
});
}
}

View File

@ -12,7 +12,7 @@ in the cluster:
Accepts either a percentage or a byte size value. It defaults to `10%`,
meaning that `10%` of the total heap allocated to a node will be used as the
indexing buffer size.
indexing buffer size shared across all shards.
`indices.memory.min_index_buffer_size`::
@ -23,10 +23,3 @@ in the cluster:
If the `index_buffer_size` is specified as a percentage, then this
setting can be used to specify an absolute maximum. Defaults to unbounded.
`indices.memory.min_shard_index_buffer_size`::
Sets a hard lower limit for the memory allocated per shard for its own
indexing buffer. Defaults to `4mb`.