fix failing test

This commit is contained in:
Michael McCandless 2015-10-14 09:11:31 -04:00 committed by mikemccand
parent 6ae8ca9a5e
commit 1b9e9ed092
4 changed files with 37 additions and 22 deletions

View File

@ -125,8 +125,7 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
"index.store.throttle.max_bytes_per_sec",
"index.translog.flush_threshold_size",
"index.translog.fs.buffer_size",
"index.version_map_size",
"index.buffer_size"));
"index.version_map_size"));
/** All known time settings for an index. */
public static final Set<String> INDEX_TIME_SETTINGS = unmodifiableSet(newHashSet(

View File

@ -97,7 +97,6 @@ public final class EngineConfig {
* Index setting to control the index buffer size.
* This setting is <b>not</b> realtime updateable.
*/
public static final String INDEX_BUFFER_SIZE_SETTING = "index.buffer_size";
/** if set to true the engine will start even if the translog id in the commit point can not be found */
public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";
@ -132,9 +131,9 @@ public final class EngineConfig {
this.failedEngineListener = failedEngineListener;
this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(INDEX_CODEC_SETTING, DEFAULT_CODEC_NAME);
// We tell IndexWriter to use large heap, but IndexingMemoryController checks periodically and refreshes the most heap-consuming
// shards when total indexing heap usage is too high:
indexingBufferSize = indexSettings.getAsBytesSize(INDEX_BUFFER_SIZE_SETTING, new ByteSizeValue(256, ByteSizeUnit.MB));
// We give IndexWriter a huge buffer, so it won't flush on its own. Instead, IndexingMemoryController periodically checks
// and refreshes the most heap-consuming shards when total indexing heap usage is too high:
indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, DEFAULT_GC_DELETES).millis();
this.translogRecoveryPerformer = translogRecoveryPerformer;
this.forceNewTranslog = indexSettings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false);

View File

@ -170,7 +170,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
}
/** ask this shard to refresh, in the background, to free up heap */
public void refreshShardAsync(ShardId shardId) {
protected void refreshShardAsync(ShardId shardId) {
IndexShard shard = getShard(shardId);
if (shard != null) {
shard.refreshAsync("memory");
@ -238,6 +238,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
totalBytesUsed += getIndexBufferRAMBytesUsed(shardId);
}
System.out.println("TOTAL=" + totalBytesUsed + " vs " + indexingBuffer);
if (totalBytesUsed > indexingBuffer.bytes()) {
// OK we are using too much; make a queue and ask largest shard(s) to refresh:
logger.debug("now refreshing some shards: total indexing bytes used [{}] vs index_buffer_size [{}]", new ByteSizeValue(totalBytesUsed), indexingBuffer);

View File

@ -1565,10 +1565,9 @@ public class InternalEngineTests extends ESTestCase {
public void testDeletesAloneCanTriggerRefresh() throws Exception {
Settings settings = Settings.builder()
.put(defaultSettings)
.put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb")
.put(IndexingMemoryController.SHARD_MEMORY_INTERVAL_TIME_SETTING, "100ms").build();
.put(IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING, "1kb").build();
try (Store store = createStore();
Engine engine = new InternalEngine(config(settings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
Engine engine = new InternalEngine(config(settings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
for (int i = 0; i < 100; i++) {
String id = Integer.toString(i);
ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocument(), B_1, null);
@ -1578,6 +1577,30 @@ public class InternalEngineTests extends ESTestCase {
// Force merge so we know all merges are done before we start deleting:
engine.forceMerge(true, 1, false, false, false);
// Make a shell of an IMC to check up on indexing buffer usage:
IndexingMemoryController imc = new IndexingMemoryController(settings, threadPool, null) {
@Override
protected IndexShard getShard(ShardId shardId) {
return null;
}
@Override
protected List<ShardId> availableShards() {
return Collections.singletonList(new ShardId("foo", 0));
}
@Override
protected void refreshShardAsync(ShardId shardId) {
engine.refresh("memory");
}
@Override
protected long getIndexBufferRAMBytesUsed(ShardId shardId) {
System.out.println("BYTES USED: " + engine.indexBufferRAMBytesUsed());
return engine.indexBufferRAMBytesUsed();
}
};
Searcher s = engine.acquireSearcher("test");
final long version1 = ((DirectoryReader) s.reader()).getVersion();
s.close();
@ -1586,18 +1609,10 @@ public class InternalEngineTests extends ESTestCase {
engine.delete(new Engine.Delete("test", id, newUid(id), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
}
// We must assertBusy because refresh due to version map being full is done in background (REFRESH) thread pool:
assertBusy(new Runnable() {
@Override
public void run() {
Searcher s2 = engine.acquireSearcher("test");
long version2 = ((DirectoryReader) s2.reader()).getVersion();
s2.close();
// 100 buffered deletes will easily exceed 25% of our 1 KB indexing buffer so it should have forced a refresh:
assertThat(version2, greaterThan(version1));
}
});
imc.forceCheck();
try (Searcher s2 = engine.acquireSearcher("test")) {
assertThat(((DirectoryReader) s2.reader()).getVersion(), greaterThan(version1));
}
}
}