Script: Fix value of `ctx._now` to be current epoch time in milliseconds (#23175)

In update scripts, `ctx._now` uses the same milliseconds value used by the
rest of the system to calculate deltas. However, that time is not
actually epoch milliseconds, as it is derived from `System.nanoTime()`.
This change reworks the estimated time thread in ThreadPool which this
time is based on to make available both the relative time, as well as
absolute milliseconds (epoch) which may be used with calendar system. It
also renames the EstimatedTimeThread to a more apt CachedTimeThread.

closes #23169
This commit is contained in:
Ryan Ernst 2017-02-22 15:11:02 -08:00 committed by GitHub
parent 175bda64a0
commit 18f57c05cf
7 changed files with 80 additions and 29 deletions

View File

@ -225,7 +225,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
final UpdateHelper.Result translate;
// translate update request
try {
translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
translate = updateHelper.prepare(updateRequest, primary, threadPool::absoluteTimeInMillis);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request

View File

@ -171,7 +171,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
final ShardId shardId = request.getShardId();
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.getId());
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::estimatedTimeInMillis);
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis);
switch (result.getResponseResult()) {
case CREATED:
IndexRequest upsertRequest = result.action();

View File

@ -188,7 +188,7 @@ public final class EngineConfig {
/**
* Returns a thread-pool mainly used to get estimated time stamps from
* {@link org.elasticsearch.threadpool.ThreadPool#estimatedTimeInMillis()} and to schedule
* {@link org.elasticsearch.threadpool.ThreadPool#relativeTimeInMillis()} and to schedule
* async force merge calls on the {@link org.elasticsearch.threadpool.ThreadPool.Names#FORCE_MERGE} thread-pool
*/
public ThreadPool getThreadPool() {

View File

@ -147,7 +147,7 @@ public class InternalEngine extends Engine {
EngineMergeScheduler scheduler = null;
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
@ -446,7 +446,7 @@ public class InternalEngine extends Engine {
private long checkDeletedAndGCed(VersionValue versionValue) {
long currentVersion;
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().relativeTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
@ -726,7 +726,7 @@ public class InternalEngine extends Engine {
private void maybePruneDeletedTombstones() {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
pruneDeletedTombstones();
}
}
@ -772,7 +772,7 @@ public class InternalEngine extends Engine {
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().relativeTimeInMillis()));
}
if (!deleteResult.hasFailure()) {
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
@ -1047,7 +1047,7 @@ public class InternalEngine extends Engine {
}
private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...

View File

@ -561,7 +561,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
private void contextProcessedSuccessfully(SearchContext context) {
context.accessed(threadPool.estimatedTimeInMillis());
context.accessed(threadPool.relativeTimeInMillis());
}
private void cleanContext(SearchContext context) {
@ -794,7 +794,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
class Reaper implements Runnable {
@Override
public void run() {
final long time = threadPool.estimatedTimeInMillis();
final long time = threadPool.relativeTimeInMillis();
for (SearchContext context : activeContexts.values()) {
// Use the same value for both checks since lastAccessTime can
// be modified by another thread between checks!

View File

@ -142,7 +142,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
private final ScheduledThreadPoolExecutor scheduler;
private final EstimatedTimeThread estimatedTimeThread;
private final CachedTimeThread cachedTimeThread;
static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();
@ -213,16 +213,33 @@ public class ThreadPool extends AbstractComponent implements Closeable {
this.scheduler.setRemoveOnCancelPolicy(true);
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.estimatedTimeThread.start();
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start();
}
public long estimatedTimeInMillis() {
return estimatedTimeThread.estimatedTimeInMillis();
/**
* Returns a value of milliseconds that may be used for relative time calculations.
*
* This method should only be used for calculating time deltas. For an epoch based
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
public long relativeTimeInMillis() {
return cachedTimeThread.relativeTimeInMillis();
}
/**
* Returns the value of milliseconds since UNIX epoch.
*
* This method should only be used for exact date/time formatting. For calculating
* time deltas that should not suffer from negative deltas, which are possible with
* this method, see {@link #relativeTimeInMillis()}.
*/
public long absoluteTimeInMillis() {
return cachedTimeThread.absoluteTimeInMillis();
}
public Counter estimatedTimeInMillisCounter() {
return estimatedTimeThread.counter;
return cachedTimeThread.counter;
}
public ThreadPoolInfo info() {
@ -342,8 +359,8 @@ public class ThreadPool extends AbstractComponent implements Closeable {
}
public void shutdown() {
estimatedTimeThread.running = false;
estimatedTimeThread.interrupt();
cachedTimeThread.running = false;
cachedTimeThread.interrupt();
scheduler.shutdown();
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {
@ -353,8 +370,8 @@ public class ThreadPool extends AbstractComponent implements Closeable {
}
public void shutdownNow() {
estimatedTimeThread.running = false;
estimatedTimeThread.interrupt();
cachedTimeThread.running = false;
cachedTimeThread.interrupt();
scheduler.shutdownNow();
for (ExecutorHolder executor : executors.values()) {
if (executor.executor() instanceof ThreadPoolExecutor) {
@ -371,7 +388,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
}
}
estimatedTimeThread.join(unit.toMillis(timeout));
cachedTimeThread.join(unit.toMillis(timeout));
return result;
}
@ -471,29 +488,50 @@ public class ThreadPool extends AbstractComponent implements Closeable {
}
}
static class EstimatedTimeThread extends Thread {
/**
* A thread to cache millisecond time values from
* {@link System#nanoTime()} and {@link System#currentTimeMillis()}.
*
* The values are updated at a specified interval.
*/
static class CachedTimeThread extends Thread {
final long interval;
final TimeCounter counter;
volatile boolean running = true;
volatile long estimatedTimeInMillis;
volatile long relativeMillis;
volatile long absoluteMillis;
EstimatedTimeThread(String name, long interval) {
CachedTimeThread(String name, long interval) {
super(name);
this.interval = interval;
this.estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime());
this.relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
this.absoluteMillis = System.currentTimeMillis();
this.counter = new TimeCounter();
setDaemon(true);
}
public long estimatedTimeInMillis() {
return this.estimatedTimeInMillis;
/**
* Return the current time used for relative calculations. This is
* {@link System#nanoTime()} truncated to milliseconds.
*/
long relativeTimeInMillis() {
return relativeMillis;
}
/**
* Return the current epoch time, used to find absolute time. This is
* a cached version of {@link System#currentTimeMillis()}.
*/
long absoluteTimeInMillis() {
return absoluteMillis;
}
@Override
public void run() {
while (running) {
estimatedTimeInMillis = TimeValue.nsecToMSec(System.nanoTime());
relativeMillis = TimeValue.nsecToMSec(System.nanoTime());
absoluteMillis = System.currentTimeMillis();
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
@ -512,7 +550,7 @@ public class ThreadPool extends AbstractComponent implements Closeable {
@Override
public long get() {
return estimatedTimeInMillis;
return relativeMillis;
}
}
}

View File

@ -46,4 +46,17 @@ public class ThreadPoolTests extends ESTestCase {
assertThat(ThreadPool.boundedBy(value, min, max), equalTo(value));
}
public void testAbsoluteTime() throws Exception {
TestThreadPool threadPool = new TestThreadPool("test");
try {
long currentTime = System.currentTimeMillis();
long gotTime = threadPool.absoluteTimeInMillis();
long delta = Math.abs(gotTime - currentTime);
assertTrue("thread pool cached absolute time " + gotTime + " is too far from real current time " + currentTime,
delta < 10000); // the delta can be large, we just care it is the same order of magnitude
} finally {
threadPool.shutdown();
threadPool.close();
}
}
}