Fixed the stats API

- now reflects the right stats around the execution threadpool

Original commit: elastic/x-pack-elasticsearch@ecf557c6be
This commit is contained in:
uboness 2015-06-24 15:02:15 +02:00
parent be33ee6323
commit b8e80773d9
7 changed files with 29 additions and 30 deletions

View File

@ -117,11 +117,11 @@ public class ExecutionService extends AbstractComponent {
return defaultThrottlePeriod;
}
public long queueSize() {
public long executionThreadPoolQueueSize() {
return executor.queue().size();
}
public long largestQueueSize() {
public long executionThreadPoolMaxSize() {
return executor.largestPoolSize();
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.watcher.execution;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;

View File

@ -55,9 +55,9 @@ public class TransportWatcherStatsAction extends WatcherTransportAction<WatcherS
protected void masterOperation(WatcherStatsRequest request, ClusterState state, ActionListener<WatcherStatsResponse> listener) throws ElasticsearchException {
WatcherStatsResponse statsResponse = new WatcherStatsResponse();
statsResponse.setWatcherState(watcherService.state());
statsResponse.setWatchExecutionQueueSize(executionService.queueSize());
statsResponse.setThreadPoolQueueSize(executionService.executionThreadPoolQueueSize());
statsResponse.setWatchesCount(watcherService.watchesCount());
statsResponse.setWatchExecutionQueueMaxSize(executionService.largestQueueSize());
statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize());
statsResponse.setVersion(WatcherVersion.CURRENT);
statsResponse.setBuild(WatcherBuild.CURRENT);

View File

@ -28,8 +28,8 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
private WatcherBuild build;
private long watchesCount;
private WatcherState watcherState;
private long watchExecutionQueueSize;
private long watchExecutionQueueMaxSize;
private long threadPoolQueueSize;
private long threadPoolMaxSize;
private List<WatchExecutionSnapshot> snapshots;
private List<QueuedWatch> queuedWatches;
@ -38,25 +38,25 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
}
/**
* @return The current watch execution queue size
* @return The current execution thread pool queue size
*/
public long getExecutionQueueSize() {
return watchExecutionQueueSize;
public long getThreadPoolQueueSize() {
return threadPoolQueueSize;
}
void setWatchExecutionQueueSize(long watchExecutionQueueSize) {
this.watchExecutionQueueSize = watchExecutionQueueSize;
void setThreadPoolQueueSize(long threadPoolQueueSize) {
this.threadPoolQueueSize = threadPoolQueueSize;
}
/**
* @return The max size of the watch execution queue
* @return The max number of threads in the execution thread pool
*/
public long getWatchExecutionQueueMaxSize() {
return watchExecutionQueueMaxSize;
public long getThreadPoolMaxSize() {
return threadPoolMaxSize;
}
void setWatchExecutionQueueMaxSize(long watchExecutionQueueMaxSize) {
this.watchExecutionQueueMaxSize = watchExecutionQueueMaxSize;
void setThreadPoolMaxSize(long threadPoolMaxSize) {
this.threadPoolMaxSize = threadPoolMaxSize;
}
/**
@ -125,8 +125,8 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
watchesCount = in.readLong();
watchExecutionQueueSize = in.readLong();
watchExecutionQueueMaxSize = in.readLong();
threadPoolQueueSize = in.readLong();
threadPoolMaxSize = in.readLong();
watcherState = WatcherState.fromId(in.readByte());
version = WatcherVersion.readVersion(in);
build = WatcherBuild.readBuild(in);
@ -151,8 +151,8 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(watchesCount);
out.writeLong(watchExecutionQueueSize);
out.writeLong(watchExecutionQueueMaxSize);
out.writeLong(threadPoolQueueSize);
out.writeLong(threadPoolMaxSize);
out.writeByte(watcherState.getId());
WatcherVersion.writeVersion(version, out);
WatcherBuild.writeBuild(build, out);
@ -182,9 +182,9 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
builder.startObject();
builder.field("watcher_state", watcherState.toString().toLowerCase(Locale.ROOT));
builder.field("watch_count", watchesCount);
builder.startObject("execution_queue");
builder.field("size", watchExecutionQueueSize);
builder.field("max_size", watchExecutionQueueMaxSize);
builder.startObject("execution_thread_pool");
builder.field("queue_size", threadPoolQueueSize);
builder.field("max_size", threadPoolMaxSize);
builder.endObject();
if (snapshots != null) {

View File

@ -280,7 +280,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
// that numRecords watch records have been processed as part of starting up.
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getExecutionQueueSize(), equalTo(0l));
assertThat(response.getThreadPoolQueueSize(), equalTo(0l));
// but even then since the execution of the watch record is async it may take a little bit before
// the actual documents are in the output index
@ -332,7 +332,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
// that numRecords watch records have been processed as part of starting up.
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatcherState(), equalTo(WatcherState.STARTED));
assertThat(response.getExecutionQueueSize(), equalTo(0l));
assertThat(response.getThreadPoolQueueSize(), equalTo(0l));
// but even then since the execution of the watch record is async it may take a little bit before
// the actual documents are in the output index

View File

@ -255,7 +255,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTests {
assertThat(watcherService.state(), is(WatcherState.STOPPED));
}
for (ExecutionService executionService : internalTestCluster().getInstances(ExecutionService.class)) {
assertThat(executionService.queueSize(), equalTo(0l));
assertThat(executionService.executionThreadPoolQueueSize(), equalTo(0l));
}
}

View File

@ -44,9 +44,9 @@ public class WatcherStatsTests extends AbstractWatcherIntegrationTests {
WatcherStatsResponse response = watcherClient().watcherStats(watcherStatsRequest).actionGet();
assertThat(response.getWatcherState(), is(WatcherState.STARTED));
assertThat(response.getExecutionQueueSize(), is(0L));
assertThat(response.getThreadPoolQueueSize(), is(0L));
assertThat(response.getWatchesCount(), is(0L));
assertThat(response.getWatchExecutionQueueMaxSize(), is(timeWarped() ? 1L : 0L));
assertThat(response.getThreadPoolMaxSize(), is(timeWarped() ? 1L : 0L));
assertThat(response.getVersion(), is(WatcherVersion.CURRENT));
assertThat(response.getBuild(), is(WatcherBuild.CURRENT));
}
@ -80,6 +80,6 @@ public class WatcherStatsTests extends AbstractWatcherIntegrationTests {
assertThat(response.getWatcherState(), is(WatcherState.STARTED));
assertThat(response.getWatchesCount(), is(1L));
assertThat(response.getWatchExecutionQueueMaxSize(), greaterThan(0L));
assertThat(response.getThreadPoolMaxSize(), greaterThan(0L));
}
}