mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-06 04:58:50 +00:00
don't call IMC.forceCheck when going active; remove nocommit/sops
This commit is contained in:
parent
1b9e9ed092
commit
77c2445f7d
@ -179,7 +179,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||||||
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
|
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
|
||||||
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
|
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
|
||||||
public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";
|
public static final String INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";
|
||||||
|
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
|
||||||
|
|
||||||
private final ShardPath path;
|
private final ShardPath path;
|
||||||
|
|
||||||
@ -898,12 +898,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||||||
/** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
|
/** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
|
||||||
private void markLastWrite(Engine.Operation op) {
|
private void markLastWrite(Engine.Operation op) {
|
||||||
lastWriteNS = op.startTime();
|
lastWriteNS = op.startTime();
|
||||||
if (active.getAndSet(true) == false) {
|
|
||||||
// We are currently inactive, but a new write operation just showed up, so we now notify IMC
|
|
||||||
// to wake up and fix our indexing buffer. We could do this async instead, but cost should
|
|
||||||
// be low, and it's rare this happens.
|
|
||||||
indexingMemoryController.forceCheck();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
|
private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
|
||||||
@ -963,8 +957,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
|
|
||||||
|
|
||||||
public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) {
|
public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) {
|
||||||
this.failedEngineListener.delegates.add(failedEngineListener);
|
this.failedEngineListener.delegates.add(failedEngineListener);
|
||||||
}
|
}
|
||||||
@ -1177,7 +1169,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||||||
* changes.
|
* changes.
|
||||||
*/
|
*/
|
||||||
public void refreshAsync(final String reason) {
|
public void refreshAsync(final String reason) {
|
||||||
// nocommit this really is async???
|
|
||||||
engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
|
engineConfig.getThreadPool().executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
@ -1202,6 +1193,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
|
// TODO: now that we use refresh to clear the indexing buffer, we should check here if we did that "recently" and
|
||||||
|
// reschedule if so...
|
||||||
if (getEngine().refreshNeeded()) {
|
if (getEngine().refreshNeeded()) {
|
||||||
refresh("schedule");
|
refresh("schedule");
|
||||||
}
|
}
|
||||||
|
@ -54,7 +54,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||||||
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
|
/** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
|
||||||
public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
|
public static final String SHARD_INACTIVE_TIME_SETTING = "indices.memory.shard_inactive_time";
|
||||||
|
|
||||||
/** How frequently we check indexing memory usage (default: 5 seconds). */
|
/** How frequently we check indexing memory usage (default: 1 seconds). */
|
||||||
public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval";
|
public static final String SHARD_MEMORY_INTERVAL_TIME_SETTING = "indices.memory.interval";
|
||||||
|
|
||||||
/** Hardwired translog buffer size */
|
/** Hardwired translog buffer size */
|
||||||
@ -107,7 +107,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||||||
|
|
||||||
this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
|
this.inactiveTime = this.settings.getAsTime(SHARD_INACTIVE_TIME_SETTING, TimeValue.timeValueMinutes(5));
|
||||||
// we need to have this relatively small to free up heap quickly enough
|
// we need to have this relatively small to free up heap quickly enough
|
||||||
this.interval = this.settings.getAsTime(SHARD_MEMORY_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(5));
|
this.interval = this.settings.getAsTime(SHARD_MEMORY_INTERVAL_TIME_SETTING, TimeValue.timeValueSeconds(1));
|
||||||
|
|
||||||
this.statusChecker = new ShardsIndicesStatusChecker();
|
this.statusChecker = new ShardsIndicesStatusChecker();
|
||||||
|
|
||||||
@ -238,12 +238,9 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
|
|||||||
totalBytesUsed += getIndexBufferRAMBytesUsed(shardId);
|
totalBytesUsed += getIndexBufferRAMBytesUsed(shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.println("TOTAL=" + totalBytesUsed + " vs " + indexingBuffer);
|
|
||||||
|
|
||||||
if (totalBytesUsed > indexingBuffer.bytes()) {
|
if (totalBytesUsed > indexingBuffer.bytes()) {
|
||||||
// OK we are using too much; make a queue and ask largest shard(s) to refresh:
|
// OK we are using too much; make a queue and ask largest shard(s) to refresh:
|
||||||
logger.debug("now refreshing some shards: total indexing bytes used [{}] vs index_buffer_size [{}]", new ByteSizeValue(totalBytesUsed), indexingBuffer);
|
logger.debug("now refreshing some shards: total indexing bytes used [{}] vs index_buffer_size [{}]", new ByteSizeValue(totalBytesUsed), indexingBuffer);
|
||||||
|
|
||||||
PriorityQueue<ShardAndBytesUsed> queue = new PriorityQueue<>();
|
PriorityQueue<ShardAndBytesUsed> queue = new PriorityQueue<>();
|
||||||
for (ShardId shardId : availableShards()) {
|
for (ShardId shardId : availableShards()) {
|
||||||
long shardBytesUsed = getIndexBufferRAMBytesUsed(shardId);
|
long shardBytesUsed = getIndexBufferRAMBytesUsed(shardId);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user