diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java index 2ee80badb74..2d8934ba3b3 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java @@ -23,19 +23,19 @@ package org.elasticsearch.common.util.concurrent; * A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation * through execution as well as only execution time. */ -class TimedRunnable implements Runnable { +class TimedRunnable extends AbstractRunnable { private final Runnable original; private final long creationTimeNanos; private long startTimeNanos; private long finishTimeNanos = -1; - TimedRunnable(Runnable original) { + TimedRunnable(final Runnable original) { this.original = original; this.creationTimeNanos = System.nanoTime(); } @Override - public void run() { + public void doRun() { try { startTimeNanos = System.nanoTime(); original.run(); @@ -44,6 +44,32 @@ class TimedRunnable implements Runnable { } } + @Override + public void onRejection(final Exception e) { + if (original instanceof AbstractRunnable) { + ((AbstractRunnable) original).onRejection(e); + } + } + + @Override + public void onAfter() { + if (original instanceof AbstractRunnable) { + ((AbstractRunnable) original).onAfter(); + } + } + + @Override + public void onFailure(final Exception e) { + if (original instanceof AbstractRunnable) { + ((AbstractRunnable) original).onFailure(e); + } + } + + @Override + public boolean isForceExecution() { + return original instanceof AbstractRunnable && ((AbstractRunnable) original).isForceExecution(); + } + /** * Return the time since this task was created until it finished running. * If the task is still running or has not yet been run, returns -1. @@ -67,4 +93,5 @@ class TimedRunnable implements Runnable { } return finishTimeNanos - startTimeNanos; } + } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/TimedRunnableTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/TimedRunnableTests.java new file mode 100644 index 00000000000..b61f47e67a3 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/TimedRunnableTests.java @@ -0,0 +1,117 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; + +public final class TimedRunnableTests extends ESTestCase { + + public void testTimedRunnableDelegatesToAbstractRunnable() { + final boolean isForceExecution = randomBoolean(); + final AtomicBoolean onAfter = new AtomicBoolean(); + final AtomicReference onRejection = new AtomicReference<>(); + final AtomicReference onFailure = new AtomicReference<>(); + final AtomicBoolean doRun = new AtomicBoolean(); + + final AbstractRunnable runnable = new AbstractRunnable() { + @Override + public boolean isForceExecution() { + return isForceExecution; + } + + @Override + public void onAfter() { + onAfter.set(true); + } + + @Override + public void onRejection(final Exception e) { + onRejection.set(e); + } + + @Override + public void onFailure(final Exception e) { + onFailure.set(e); + } + + @Override + protected void doRun() throws Exception { + doRun.set(true); + } + }; + + final TimedRunnable timedRunnable = new TimedRunnable(runnable); + + assertThat(timedRunnable.isForceExecution(), equalTo(isForceExecution)); + + timedRunnable.onAfter(); + assertTrue(onAfter.get()); + + final Exception rejection = new RejectedExecutionException(); + timedRunnable.onRejection(rejection); + assertThat(onRejection.get(), equalTo(rejection)); + + final Exception failure = new Exception(); + timedRunnable.onFailure(failure); + assertThat(onFailure.get(), equalTo(failure)); + + timedRunnable.run(); + assertTrue(doRun.get()); + } + + public void testTimedRunnableDelegatesRunInFailureCase() { + final AtomicBoolean onAfter = new AtomicBoolean(); + final AtomicReference onFailure = new AtomicReference<>(); + final AtomicBoolean doRun = new AtomicBoolean(); + + final Exception exception = new Exception(); + + final AbstractRunnable runnable = new AbstractRunnable() { + @Override + public void onAfter() { + onAfter.set(true); + } + + @Override + public void onFailure(final Exception e) { + onFailure.set(e); + } + + @Override + protected void doRun() throws Exception { + doRun.set(true); + throw exception; + } + }; + + final TimedRunnable timedRunnable = new TimedRunnable(runnable); + timedRunnable.run(); + assertTrue(doRun.get()); + assertThat(onFailure.get(), equalTo(exception)); + assertTrue(onAfter.get()); + } + +}