Improve pending api to include current executing class

the pending tasks api will now include the current executing tasks (with a proper marker boolean flag)
this will also help in tests that wait for no pending tasks, to also wait till the current executing task is done
closes #6744
This commit is contained in:
Shay Banon 2014-07-05 14:55:06 +02:00
parent c8e553054b
commit 1d860f70ca
6 changed files with 85 additions and 53 deletions

View File

@ -89,10 +89,11 @@ public class PendingClusterTasksResponse extends ActionResponse implements Itera
builder.startArray(Fields.TASKS);
for (PendingClusterTask pendingClusterTask : this) {
builder.startObject();
builder.field(Fields.INSERT_ORDER, pendingClusterTask.insertOrder());
builder.field(Fields.PRIORITY, pendingClusterTask.priority());
builder.field(Fields.SOURCE, pendingClusterTask.source());
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.timeInQueueInMillis());
builder.field(Fields.INSERT_ORDER, pendingClusterTask.getInsertOrder());
builder.field(Fields.PRIORITY, pendingClusterTask.getPriority());
builder.field(Fields.SOURCE, pendingClusterTask.getSource());
builder.field(Fields.EXECUTING, pendingClusterTask.isExecuting());
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis());
builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
builder.endObject();
}
@ -103,6 +104,7 @@ public class PendingClusterTasksResponse extends ActionResponse implements Itera
static final class Fields {
static final XContentBuilderString TASKS = new XContentBuilderString("tasks");
static final XContentBuilderString EXECUTING = new XContentBuilderString("executing");
static final XContentBuilderString INSERT_ORDER = new XContentBuilderString("insert_order");
static final XContentBuilderString PRIORITY = new XContentBuilderString("priority");
static final XContentBuilderString SOURCE = new XContentBuilderString("source");

View File

@ -279,7 +279,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
timeInQueue = -1;
}
pendingClusterTasks.add(new PendingClusterTask(pending.insertionOrder, pending.priority, new StringText(source), timeInQueue));
pendingClusterTasks.add(new PendingClusterTask(pending.insertionOrder, pending.priority, new StringText(source), timeInQueue, pending.executing));
}
return pendingClusterTasks;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.service;
import org.elasticsearch.Version;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -36,59 +37,52 @@ public class PendingClusterTask implements Streamable {
private Priority priority;
private Text source;
private long timeInQueue;
private boolean executing;
public PendingClusterTask() {
}
public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue) {
public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing) {
this.insertOrder = insertOrder;
this.priority = priority;
this.source = source;
this.timeInQueue = timeInQueue;
}
public long insertOrder() {
return insertOrder;
this.executing = executing;
}
public long getInsertOrder() {
return insertOrder();
}
public Priority priority() {
return priority;
return insertOrder;
}
public Priority getPriority() {
return priority();
}
public Text source() {
return source;
return priority;
}
public Text getSource() {
return source();
}
public long timeInQueueInMillis() {
return timeInQueue;
return source;
}
public long getTimeInQueueInMillis() {
return timeInQueueInMillis();
return timeInQueue;
}
public TimeValue getTimeInQueue() {
return new TimeValue(getTimeInQueueInMillis());
}
public boolean isExecuting() {
return executing;
}
@Override
public void readFrom(StreamInput in) throws IOException {
insertOrder = in.readVLong();
priority = Priority.readFrom(in);
source = in.readText();
timeInQueue = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
executing = in.readBoolean();
}
}
@Override
@ -97,5 +91,8 @@ public class PendingClusterTask implements Streamable {
Priority.writeTo(priority, out);
out.writeText(source);
out.writeVLong(timeInQueue);
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
out.writeBoolean(executing);
}
}
}

View File

@ -18,9 +18,12 @@
*/
package org.elasticsearch.common.util.concurrent;
import com.google.common.collect.Lists;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
@ -34,25 +37,39 @@ import java.util.concurrent.atomic.AtomicLong;
public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private AtomicLong insertionOrder = new AtomicLong();
private Queue<Runnable> current = ConcurrentCollections.newQueue();
PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory);
}
public Pending[] getPending() {
Object[] objects = getQueue().toArray();
Pending[] infos = new Pending[objects.length];
for (int i = 0; i < objects.length; i++) {
Object obj = objects[i];
if (obj instanceof TieBreakingPrioritizedRunnable) {
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) obj;
infos[i] = new Pending(t.runnable, t.priority(), t.insertionOrder);
} else if (obj instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) obj;
infos[i] = new Pending(t.task, t.priority, t.insertionOrder);
List<Pending> pending = Lists.newArrayList();
addPending(Lists.newArrayList(current), pending, true);
addPending(Lists.newArrayList(getQueue()), pending, false);
return pending.toArray(new Pending[pending.size()]);
}
private void addPending(List<Runnable> runnables, List<Pending> pending, boolean executing) {
for (Runnable runnable : runnables) {
if (runnable instanceof TieBreakingPrioritizedRunnable) {
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) runnable;
pending.add(new Pending(t.runnable, t.priority(), t.insertionOrder, executing));
} else if (runnable instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) runnable;
pending.add(new Pending(t.task, t.priority, t.insertionOrder, executing));
}
}
return infos;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
current.add(r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
current.remove(r);
}
public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) {
@ -106,11 +123,13 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
public final Object task;
public final Priority priority;
public final long insertionOrder;
public final boolean executing;
public Pending(Object task, Priority priority, long insertionOrder) {
public Pending(Object task, Priority priority, long insertionOrder, boolean executing) {
this.task = task;
this.priority = priority;
this.insertionOrder = insertionOrder;
this.executing = executing;
}
}

View File

@ -450,19 +450,23 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
}
// The tasks can be re-ordered, so we need to check out-of-order
Set<String> controlSources = new HashSet<>(Arrays.asList("2", "3", "4", "5", "6", "7", "8", "9", "10"));
Set<String> controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
List<PendingClusterTask> pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks.size(), equalTo(9));
assertThat(pendingClusterTasks.size(), equalTo(10));
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
for (PendingClusterTask task : pendingClusterTasks) {
assertTrue(controlSources.remove(task.source().string()));
assertTrue(controlSources.remove(task.getSource().string()));
}
assertTrue(controlSources.isEmpty());
controlSources = new HashSet<>(Arrays.asList("2", "3", "4", "5", "6", "7", "8", "9", "10"));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
PendingClusterTasksResponse response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks().size(), equalTo(9));
assertThat(response.pendingTasks().size(), equalTo(10));
assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
for (PendingClusterTask task : response) {
assertTrue(controlSources.remove(task.source().string()));
assertTrue(controlSources.remove(task.getSource().string()));
}
assertTrue(controlSources.isEmpty());
block1.countDown();
@ -511,18 +515,18 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
Thread.sleep(100);
pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks.size(), equalTo(4));
controlSources = new HashSet<>(Arrays.asList("2", "3", "4", "5"));
assertThat(pendingClusterTasks.size(), equalTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : pendingClusterTasks) {
assertTrue(controlSources.remove(task.source().string()));
assertTrue(controlSources.remove(task.getSource().string()));
}
assertTrue(controlSources.isEmpty());
response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks().size(), equalTo(4));
controlSources = new HashSet<>(Arrays.asList("2", "3", "4", "5"));
assertThat(response.pendingTasks().size(), equalTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : response) {
assertTrue(controlSources.remove(task.source().string()));
assertTrue(controlSources.remove(task.getSource().string()));
assertThat(task.getTimeInQueueInMillis(), greaterThan(0l));
}
assertTrue(controlSources.isEmpty());

View File

@ -178,11 +178,13 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
public void testTimeout() throws Exception {
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
final CountDownLatch invoked = new CountDownLatch(1);
final CountDownLatch block = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
try {
invoked.countDown();
block.await();
} catch (InterruptedException e) {
fail();
@ -194,6 +196,11 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
return "the blocking";
}
});
invoked.await();
PrioritizedEsThreadPoolExecutor.Pending[] pending = executor.getPending();
assertThat(pending.length, equalTo(1));
assertThat(pending[0].task.toString(), equalTo("the blocking"));
assertThat(pending[0].executing, equalTo(true));
final AtomicBoolean executeCalled = new AtomicBoolean();
final CountDownLatch timedOut = new CountDownLatch(1);
@ -215,9 +222,12 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
}
);
PrioritizedEsThreadPoolExecutor.Pending[] pending = executor.getPending();
assertThat(pending.length, equalTo(1));
assertThat(pending[0].task.toString(), equalTo("the waiting"));
pending = executor.getPending();
assertThat(pending.length, equalTo(2));
assertThat(pending[0].task.toString(), equalTo("the blocking"));
assertThat(pending[0].executing, equalTo(true));
assertThat(pending[1].task.toString(), equalTo("the waiting"));
assertThat(pending[1].executing, equalTo(false));
assertThat(timedOut.await(2, TimeUnit.SECONDS), equalTo(true));
block.countDown();