[Rollup] Add more diagnostic stats to job (#35471)
* [Rollup] Add more diagnostic stats to job To help debug future performance issues, this adds the min/max/avg/count/total latencies (in milliseconds) for search and bulk phase. This latency is the total service time including transfer between nodes, not just the `took` time. It also adds the count of search/bulk failures encountered during runtime. This information is also in the log, but a runtime counter will help expose problems faster * review cleanup * Remove dead ParseFields
This commit is contained in:
parent
af9233b067
commit
48fa251812
|
@ -26,15 +26,15 @@ import org.elasticsearch.common.xcontent.ObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
|
||||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
|
|
||||||
import static java.util.Collections.unmodifiableList;
|
import static java.util.Collections.unmodifiableList;
|
||||||
import static java.util.stream.Collectors.joining;
|
import static java.util.stream.Collectors.joining;
|
||||||
|
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||||
|
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Response from rollup's get jobs api.
|
* Response from rollup's get jobs api.
|
||||||
|
@ -51,6 +51,12 @@ public class GetRollupJobResponse {
|
||||||
static final ParseField STATE = new ParseField("job_state");
|
static final ParseField STATE = new ParseField("job_state");
|
||||||
static final ParseField CURRENT_POSITION = new ParseField("current_position");
|
static final ParseField CURRENT_POSITION = new ParseField("current_position");
|
||||||
static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id");
|
static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id");
|
||||||
|
static final ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
|
||||||
|
static final ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
|
||||||
|
static final ParseField INDEX_TOTAL = new ParseField("index_total");
|
||||||
|
static final ParseField SEARCH_TOTAL = new ParseField("search_total");
|
||||||
|
static final ParseField SEARCH_FAILURES = new ParseField("search_failures");
|
||||||
|
static final ParseField INDEX_FAILURES = new ParseField("index_failures");
|
||||||
|
|
||||||
private List<JobWrapper> jobs;
|
private List<JobWrapper> jobs;
|
||||||
|
|
||||||
|
@ -181,12 +187,25 @@ public class GetRollupJobResponse {
|
||||||
private final long numInputDocuments;
|
private final long numInputDocuments;
|
||||||
private final long numOuputDocuments;
|
private final long numOuputDocuments;
|
||||||
private final long numInvocations;
|
private final long numInvocations;
|
||||||
|
private long indexTime;
|
||||||
|
private long indexTotal;
|
||||||
|
private long searchTime;
|
||||||
|
private long searchTotal;
|
||||||
|
private long indexFailures;
|
||||||
|
private long searchFailures;
|
||||||
|
|
||||||
RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
|
RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
|
||||||
|
long indexTime, long indexTotal, long searchTime, long searchTotal, long indexFailures, long searchFailures) {
|
||||||
this.numPages = numPages;
|
this.numPages = numPages;
|
||||||
this.numInputDocuments = numInputDocuments;
|
this.numInputDocuments = numInputDocuments;
|
||||||
this.numOuputDocuments = numOuputDocuments;
|
this.numOuputDocuments = numOuputDocuments;
|
||||||
this.numInvocations = numInvocations;
|
this.numInvocations = numInvocations;
|
||||||
|
this.indexTime = indexTime;
|
||||||
|
this.indexTotal = indexTotal;
|
||||||
|
this.searchTime = searchTime;
|
||||||
|
this.searchTotal = searchTotal;
|
||||||
|
this.indexFailures = indexFailures;
|
||||||
|
this.searchFailures = searchFailures;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -217,15 +236,65 @@ public class GetRollupJobResponse {
|
||||||
return numOuputDocuments;
|
return numOuputDocuments;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of failures that have occurred during the bulk indexing phase of Rollup
|
||||||
|
*/
|
||||||
|
public long getIndexFailures() {
|
||||||
|
return indexFailures;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of failures that have occurred during the search phase of Rollup
|
||||||
|
*/
|
||||||
|
public long getSearchFailures() {
|
||||||
|
return searchFailures;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the time spent indexing (cumulative) in milliseconds
|
||||||
|
*/
|
||||||
|
public long getIndexTime() {
|
||||||
|
return indexTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the time spent searching (cumulative) in milliseconds
|
||||||
|
*/
|
||||||
|
public long getSearchTime() {
|
||||||
|
return searchTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the total number of indexing requests that have been sent by the rollup job
|
||||||
|
* (Note: this is not the number of _documents_ that have been indexed)
|
||||||
|
*/
|
||||||
|
public long getIndexTotal() {
|
||||||
|
return indexTotal;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the total number of search requests that have been sent by the rollup job
|
||||||
|
*/
|
||||||
|
public long getSearchTotal() {
|
||||||
|
return searchTotal;
|
||||||
|
}
|
||||||
|
|
||||||
private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>(
|
private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>(
|
||||||
STATS.getPreferredName(),
|
STATS.getPreferredName(),
|
||||||
true,
|
true,
|
||||||
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
|
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
|
||||||
|
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
|
||||||
static {
|
static {
|
||||||
PARSER.declareLong(constructorArg(), NUM_PAGES);
|
PARSER.declareLong(constructorArg(), NUM_PAGES);
|
||||||
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
|
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
|
||||||
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
|
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
|
||||||
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
|
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
|
||||||
|
PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
|
||||||
|
PARSER.declareLong(constructorArg(), INDEX_TOTAL);
|
||||||
|
PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
|
||||||
|
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
|
||||||
|
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
|
||||||
|
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -234,14 +303,21 @@ public class GetRollupJobResponse {
|
||||||
if (other == null || getClass() != other.getClass()) return false;
|
if (other == null || getClass() != other.getClass()) return false;
|
||||||
RollupIndexerJobStats that = (RollupIndexerJobStats) other;
|
RollupIndexerJobStats that = (RollupIndexerJobStats) other;
|
||||||
return Objects.equals(this.numPages, that.numPages)
|
return Objects.equals(this.numPages, that.numPages)
|
||||||
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
|
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
|
||||||
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
|
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
|
||||||
&& Objects.equals(this.numInvocations, that.numInvocations);
|
&& Objects.equals(this.numInvocations, that.numInvocations)
|
||||||
|
&& Objects.equals(this.indexTime, that.indexTime)
|
||||||
|
&& Objects.equals(this.searchTime, that.searchTime)
|
||||||
|
&& Objects.equals(this.indexFailures, that.indexFailures)
|
||||||
|
&& Objects.equals(this.searchFailures, that.searchFailures)
|
||||||
|
&& Objects.equals(this.searchTotal, that.searchTotal)
|
||||||
|
&& Objects.equals(this.indexTotal, that.indexTotal);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
|
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
|
||||||
|
indexTime, searchTime, indexFailures, searchFailures, searchTotal, indexTotal);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -249,7 +325,13 @@ public class GetRollupJobResponse {
|
||||||
return "{pages=" + numPages
|
return "{pages=" + numPages
|
||||||
+ ", input_docs=" + numInputDocuments
|
+ ", input_docs=" + numInputDocuments
|
||||||
+ ", output_docs=" + numOuputDocuments
|
+ ", output_docs=" + numOuputDocuments
|
||||||
+ ", invocations=" + numInvocations + "}";
|
+ ", invocations=" + numInvocations
|
||||||
|
+ ", index_failures=" + indexFailures
|
||||||
|
+ ", search_failures=" + searchFailures
|
||||||
|
+ ", index_time_in_ms=" + indexTime
|
||||||
|
+ ", index_total=" + indexTotal
|
||||||
|
+ ", search_time_in_ms=" + searchTime
|
||||||
|
+ ", search_total=" + searchTotal+ "}";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,9 @@ public class GetRollupJobResponseTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private RollupIndexerJobStats randomStats() {
|
private RollupIndexerJobStats randomStats() {
|
||||||
return new RollupIndexerJobStats(randomLong(), randomLong(), randomLong(), randomLong());
|
return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
|
||||||
|
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
|
||||||
|
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
|
||||||
}
|
}
|
||||||
|
|
||||||
private RollupJobStatus randomStatus() {
|
private RollupJobStatus randomStatus() {
|
||||||
|
@ -115,6 +117,13 @@ public class GetRollupJobResponseTests extends ESTestCase {
|
||||||
builder.field(GetRollupJobResponse.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
|
builder.field(GetRollupJobResponse.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
|
||||||
builder.field(GetRollupJobResponse.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
|
builder.field(GetRollupJobResponse.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
|
||||||
builder.field(GetRollupJobResponse.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
|
builder.field(GetRollupJobResponse.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
|
||||||
|
builder.field(GetRollupJobResponse.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
|
||||||
|
builder.field(GetRollupJobResponse.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
|
||||||
|
builder.field(GetRollupJobResponse.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
|
||||||
|
builder.field(GetRollupJobResponse.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
|
||||||
|
builder.field(GetRollupJobResponse.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
|
||||||
|
builder.field(GetRollupJobResponse.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,7 +101,13 @@ Which will yield the following response:
|
||||||
"pages_processed" : 0,
|
"pages_processed" : 0,
|
||||||
"documents_processed" : 0,
|
"documents_processed" : 0,
|
||||||
"rollups_indexed" : 0,
|
"rollups_indexed" : 0,
|
||||||
"trigger_count" : 0
|
"trigger_count" : 0,
|
||||||
|
"index_failures": 0,
|
||||||
|
"index_time_in_ms": 0,
|
||||||
|
"index_total": 0,
|
||||||
|
"search_failures": 0,
|
||||||
|
"search_time_in_ms": 0,
|
||||||
|
"search_total": 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
@ -221,7 +227,13 @@ Which will yield the following response:
|
||||||
"pages_processed" : 0,
|
"pages_processed" : 0,
|
||||||
"documents_processed" : 0,
|
"documents_processed" : 0,
|
||||||
"rollups_indexed" : 0,
|
"rollups_indexed" : 0,
|
||||||
"trigger_count" : 0
|
"trigger_count" : 0,
|
||||||
|
"index_failures": 0,
|
||||||
|
"index_time_in_ms": 0,
|
||||||
|
"index_total": 0,
|
||||||
|
"search_failures": 0,
|
||||||
|
"search_time_in_ms": 0,
|
||||||
|
"search_total": 0
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -270,7 +282,13 @@ Which will yield the following response:
|
||||||
"pages_processed" : 0,
|
"pages_processed" : 0,
|
||||||
"documents_processed" : 0,
|
"documents_processed" : 0,
|
||||||
"rollups_indexed" : 0,
|
"rollups_indexed" : 0,
|
||||||
"trigger_count" : 0
|
"trigger_count" : 0,
|
||||||
|
"index_failures": 0,
|
||||||
|
"index_time_in_ms": 0,
|
||||||
|
"index_total": 0,
|
||||||
|
"search_failures": 0,
|
||||||
|
"search_time_in_ms": 0,
|
||||||
|
"search_total": 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -153,9 +153,10 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
|
||||||
// fire off the search. Note this is async, the method will return from here
|
// fire off the search. Note this is async, the method will return from here
|
||||||
executor.execute(() -> {
|
executor.execute(() -> {
|
||||||
try {
|
try {
|
||||||
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc)));
|
stats.markStartSearch();
|
||||||
|
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
finishWithFailure(e);
|
finishWithSearchFailure(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
|
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
|
||||||
|
@ -256,7 +257,13 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
|
||||||
*/
|
*/
|
||||||
protected abstract void onAbort();
|
protected abstract void onAbort();
|
||||||
|
|
||||||
private void finishWithFailure(Exception exc) {
|
private void finishWithSearchFailure(Exception exc) {
|
||||||
|
stats.incrementSearchFailures();
|
||||||
|
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void finishWithIndexingFailure(Exception exc) {
|
||||||
|
stats.incrementIndexingFailures();
|
||||||
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
|
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,6 +298,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onSearchResponse(SearchResponse searchResponse) {
|
private void onSearchResponse(SearchResponse searchResponse) {
|
||||||
|
stats.markEndSearch();
|
||||||
try {
|
try {
|
||||||
if (checkState(getState()) == false) {
|
if (checkState(getState()) == false) {
|
||||||
return;
|
return;
|
||||||
|
@ -320,6 +328,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
|
||||||
// TODO this might be a valid case, e.g. if implementation filters
|
// TODO this might be a valid case, e.g. if implementation filters
|
||||||
assert bulkRequest.requests().size() > 0;
|
assert bulkRequest.requests().size() > 0;
|
||||||
|
|
||||||
|
stats.markStartIndexing();
|
||||||
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
|
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
|
||||||
// TODO we should check items in the response and move after accordingly to
|
// TODO we should check items in the response and move after accordingly to
|
||||||
// resume the failing buckets ?
|
// resume the failing buckets ?
|
||||||
|
@ -335,16 +344,16 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
|
||||||
position.set(newPosition);
|
position.set(newPosition);
|
||||||
|
|
||||||
onBulkResponse(bulkResponse, newPosition);
|
onBulkResponse(bulkResponse, newPosition);
|
||||||
}, exc -> finishWithFailure(exc)));
|
}, this::finishWithIndexingFailure));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
finishWithFailure(e);
|
finishWithSearchFailure(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onBulkResponse(BulkResponse response, JobPosition position) {
|
private void onBulkResponse(BulkResponse response, JobPosition position) {
|
||||||
|
stats.markEndIndexing();
|
||||||
try {
|
try {
|
||||||
|
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
|
||||||
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithFailure);
|
|
||||||
// TODO probably something more intelligent than every-50 is needed
|
// TODO probably something more intelligent than every-50 is needed
|
||||||
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
|
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
|
||||||
doSaveState(IndexerState.INDEXING, position, () -> doNextSearch(buildSearchRequest(), listener));
|
doSaveState(IndexerState.INDEXING, position, () -> doNextSearch(buildSearchRequest(), listener));
|
||||||
|
@ -352,7 +361,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
|
||||||
doNextSearch(buildSearchRequest(), listener);
|
doNextSearch(buildSearchRequest(), listener);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
finishWithFailure(e);
|
finishWithIndexingFailure(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.core.indexing;
|
package org.elasticsearch.xpack.core.indexing;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.common.ParseField;
|
import org.elasticsearch.common.ParseField;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
@ -27,15 +28,32 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
|
||||||
protected long numInputDocuments = 0;
|
protected long numInputDocuments = 0;
|
||||||
protected long numOuputDocuments = 0;
|
protected long numOuputDocuments = 0;
|
||||||
protected long numInvocations = 0;
|
protected long numInvocations = 0;
|
||||||
|
protected long indexTime = 0;
|
||||||
|
protected long searchTime = 0;
|
||||||
|
protected long indexTotal = 0;
|
||||||
|
protected long searchTotal = 0;
|
||||||
|
protected long indexFailures = 0;
|
||||||
|
protected long searchFailures = 0;
|
||||||
|
|
||||||
|
private long startIndexTime;
|
||||||
|
private long startSearchTime;
|
||||||
|
|
||||||
public IndexerJobStats() {
|
public IndexerJobStats() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
|
public IndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
|
||||||
|
long indexTime, long searchTime, long indexTotal, long searchTotal,
|
||||||
|
long indexFailures, long searchFailures) {
|
||||||
this.numPages = numPages;
|
this.numPages = numPages;
|
||||||
this.numInputDocuments = numInputDocuments;
|
this.numInputDocuments = numInputDocuments;
|
||||||
this.numOuputDocuments = numOuputDocuments;
|
this.numOuputDocuments = numOuputDocuments;
|
||||||
this.numInvocations = numInvocations;
|
this.numInvocations = numInvocations;
|
||||||
|
this.indexTime = indexTime;
|
||||||
|
this.searchTime = searchTime;
|
||||||
|
this.indexTotal = indexTotal;
|
||||||
|
this.searchTotal = searchTotal;
|
||||||
|
this.indexFailures = indexFailures;
|
||||||
|
this.searchFailures = searchFailures;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndexerJobStats(StreamInput in) throws IOException {
|
public IndexerJobStats(StreamInput in) throws IOException {
|
||||||
|
@ -43,6 +61,15 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
|
||||||
this.numInputDocuments = in.readVLong();
|
this.numInputDocuments = in.readVLong();
|
||||||
this.numOuputDocuments = in.readVLong();
|
this.numOuputDocuments = in.readVLong();
|
||||||
this.numInvocations = in.readVLong();
|
this.numInvocations = in.readVLong();
|
||||||
|
// TODO change this after backport
|
||||||
|
if (in.getVersion().onOrAfter(Version.CURRENT)) {
|
||||||
|
this.indexTime = in.readVLong();
|
||||||
|
this.searchTime = in.readVLong();
|
||||||
|
this.indexTotal = in.readVLong();
|
||||||
|
this.searchTotal = in.readVLong();
|
||||||
|
this.indexFailures = in.readVLong();
|
||||||
|
this.searchFailures = in.readVLong();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getNumPages() {
|
public long getNumPages() {
|
||||||
|
@ -61,6 +88,30 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
|
||||||
return numOuputDocuments;
|
return numOuputDocuments;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getIndexFailures() {
|
||||||
|
return indexFailures;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSearchFailures() {
|
||||||
|
return searchFailures;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIndexTime() {
|
||||||
|
return indexTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSearchTime() {
|
||||||
|
return searchTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIndexTotal() {
|
||||||
|
return indexTotal;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSearchTotal() {
|
||||||
|
return searchTotal;
|
||||||
|
}
|
||||||
|
|
||||||
public void incrementNumPages(long n) {
|
public void incrementNumPages(long n) {
|
||||||
assert(n >= 0);
|
assert(n >= 0);
|
||||||
numPages += n;
|
numPages += n;
|
||||||
|
@ -81,12 +132,47 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
|
||||||
numOuputDocuments += n;
|
numOuputDocuments += n;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incrementIndexingFailures() {
|
||||||
|
this.indexFailures += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrementSearchFailures() {
|
||||||
|
this.searchFailures += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void markStartIndexing() {
|
||||||
|
this.startIndexTime = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void markEndIndexing() {
|
||||||
|
indexTime += ((System.nanoTime() - startIndexTime) / 1000000);
|
||||||
|
indexTotal += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void markStartSearch() {
|
||||||
|
this.startSearchTime = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void markEndSearch() {
|
||||||
|
searchTime += ((System.nanoTime() - startSearchTime) / 1000000);
|
||||||
|
searchTotal += 1;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
out.writeVLong(numPages);
|
out.writeVLong(numPages);
|
||||||
out.writeVLong(numInputDocuments);
|
out.writeVLong(numInputDocuments);
|
||||||
out.writeVLong(numOuputDocuments);
|
out.writeVLong(numOuputDocuments);
|
||||||
out.writeVLong(numInvocations);
|
out.writeVLong(numInvocations);
|
||||||
|
// TODO change after backport
|
||||||
|
if (out.getVersion().onOrAfter(Version.CURRENT)) {
|
||||||
|
out.writeVLong(indexTime);
|
||||||
|
out.writeVLong(searchTime);
|
||||||
|
out.writeVLong(indexTotal);
|
||||||
|
out.writeVLong(searchTotal);
|
||||||
|
out.writeVLong(indexFailures);
|
||||||
|
out.writeVLong(searchFailures);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -102,13 +188,20 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
|
||||||
IndexerJobStats that = (IndexerJobStats) other;
|
IndexerJobStats that = (IndexerJobStats) other;
|
||||||
|
|
||||||
return Objects.equals(this.numPages, that.numPages)
|
return Objects.equals(this.numPages, that.numPages)
|
||||||
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
|
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
|
||||||
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
|
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
|
||||||
&& Objects.equals(this.numInvocations, that.numInvocations);
|
&& Objects.equals(this.numInvocations, that.numInvocations)
|
||||||
|
&& Objects.equals(this.indexTime, that.indexTime)
|
||||||
|
&& Objects.equals(this.searchTime, that.searchTime)
|
||||||
|
&& Objects.equals(this.indexFailures, that.indexFailures)
|
||||||
|
&& Objects.equals(this.searchFailures, that.searchFailures)
|
||||||
|
&& Objects.equals(this.indexTotal, that.indexTotal)
|
||||||
|
&& Objects.equals(this.searchTotal, that.searchTotal);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
|
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
|
||||||
|
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,24 +25,40 @@ public class RollupIndexerJobStats extends IndexerJobStats {
|
||||||
private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
|
private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
|
||||||
private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("rollups_indexed");
|
private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("rollups_indexed");
|
||||||
private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
|
private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
|
||||||
|
private static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
|
||||||
|
private static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
|
||||||
|
private static ParseField INDEX_TOTAL = new ParseField("index_total");
|
||||||
|
private static ParseField SEARCH_TOTAL = new ParseField("search_total");
|
||||||
|
private static ParseField SEARCH_FAILURES = new ParseField("search_failures");
|
||||||
|
private static ParseField INDEX_FAILURES = new ParseField("index_failures");
|
||||||
|
|
||||||
public static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER =
|
public static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER =
|
||||||
new ConstructingObjectParser<>(NAME.getPreferredName(),
|
new ConstructingObjectParser<>(NAME.getPreferredName(),
|
||||||
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
|
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
|
||||||
|
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
|
||||||
|
|
||||||
static {
|
static {
|
||||||
PARSER.declareLong(constructorArg(), NUM_PAGES);
|
PARSER.declareLong(constructorArg(), NUM_PAGES);
|
||||||
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
|
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
|
||||||
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
|
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
|
||||||
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
|
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
|
||||||
|
PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
|
||||||
|
PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
|
||||||
|
PARSER.declareLong(constructorArg(), INDEX_TOTAL);
|
||||||
|
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
|
||||||
|
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
|
||||||
|
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RollupIndexerJobStats() {
|
public RollupIndexerJobStats() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
public RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
|
public RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
|
||||||
super(numPages, numInputDocuments, numOuputDocuments, numInvocations);
|
long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures,
|
||||||
|
long searchFailures) {
|
||||||
|
super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime,
|
||||||
|
indexTotal, searchTotal, indexFailures, searchFailures);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RollupIndexerJobStats(StreamInput in) throws IOException {
|
public RollupIndexerJobStats(StreamInput in) throws IOException {
|
||||||
|
@ -56,6 +72,12 @@ public class RollupIndexerJobStats extends IndexerJobStats {
|
||||||
builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments);
|
builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments);
|
||||||
builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments);
|
builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments);
|
||||||
builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations);
|
builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations);
|
||||||
|
builder.field(INDEX_TIME_IN_MS.getPreferredName(), indexTime);
|
||||||
|
builder.field(INDEX_TOTAL.getPreferredName(), indexTotal);
|
||||||
|
builder.field(INDEX_FAILURES.getPreferredName(), indexFailures);
|
||||||
|
builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime);
|
||||||
|
builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal);
|
||||||
|
builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures);
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,7 +42,8 @@ public class JobWrapperSerializingTests extends AbstractSerializingTestCase<GetR
|
||||||
|
|
||||||
return new GetRollupJobsAction.JobWrapper(ConfigTestHelpers.randomRollupJobConfig(random()),
|
return new GetRollupJobsAction.JobWrapper(ConfigTestHelpers.randomRollupJobConfig(random()),
|
||||||
new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
|
new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
|
||||||
randomNonNegativeLong(), randomNonNegativeLong()),
|
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
|
||||||
|
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()),
|
||||||
new RollupJobStatus(state, Collections.emptyMap(), randomBoolean()));
|
new RollupJobStatus(state, Collections.emptyMap(), randomBoolean()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,12 @@ public class RollupIndexerJobStatsTests extends AbstractSerializingTestCase<Roll
|
||||||
|
|
||||||
public static RollupIndexerJobStats randomStats() {
|
public static RollupIndexerJobStats randomStats() {
|
||||||
return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
|
return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
|
||||||
randomNonNegativeLong(), randomNonNegativeLong());
|
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
|
||||||
|
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean supportsUnknownFields() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,7 @@ import static org.mockito.Mockito.when;
|
||||||
public class IndexerUtilsTests extends AggregatorTestCase {
|
public class IndexerUtilsTests extends AggregatorTestCase {
|
||||||
public void testMissingFields() throws IOException {
|
public void testMissingFields() throws IOException {
|
||||||
String indexName = randomAlphaOfLengthBetween(1, 10);
|
String indexName = randomAlphaOfLengthBetween(1, 10);
|
||||||
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0);
|
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
|
||||||
|
|
||||||
String timestampField = "the_histo";
|
String timestampField = "the_histo";
|
||||||
String valueField = "the_avg";
|
String valueField = "the_avg";
|
||||||
|
@ -130,7 +130,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||||
|
|
||||||
public void testCorrectFields() throws IOException {
|
public void testCorrectFields() throws IOException {
|
||||||
String indexName = randomAlphaOfLengthBetween(1, 10);
|
String indexName = randomAlphaOfLengthBetween(1, 10);
|
||||||
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0);
|
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
|
||||||
|
|
||||||
String timestampField = "the_histo";
|
String timestampField = "the_histo";
|
||||||
String valueField = "the_avg";
|
String valueField = "the_avg";
|
||||||
|
@ -198,7 +198,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||||
|
|
||||||
public void testNumericTerms() throws IOException {
|
public void testNumericTerms() throws IOException {
|
||||||
String indexName = randomAlphaOfLengthBetween(1, 10);
|
String indexName = randomAlphaOfLengthBetween(1, 10);
|
||||||
RollupIndexerJobStats stats= new RollupIndexerJobStats(0, 0, 0, 0);
|
RollupIndexerJobStats stats= new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
|
||||||
|
|
||||||
String valueField = "the_avg";
|
String valueField = "the_avg";
|
||||||
|
|
||||||
|
@ -254,7 +254,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||||
|
|
||||||
public void testEmptyCounts() throws IOException {
|
public void testEmptyCounts() throws IOException {
|
||||||
String indexName = randomAlphaOfLengthBetween(1, 10);
|
String indexName = randomAlphaOfLengthBetween(1, 10);
|
||||||
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0);
|
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
|
||||||
|
|
||||||
String timestampField = "ts";
|
String timestampField = "ts";
|
||||||
String valueField = "the_avg";
|
String valueField = "the_avg";
|
||||||
|
@ -490,7 +490,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||||
|
|
||||||
public void testMissingBuckets() throws IOException {
|
public void testMissingBuckets() throws IOException {
|
||||||
String indexName = randomAlphaOfLengthBetween(1, 10);
|
String indexName = randomAlphaOfLengthBetween(1, 10);
|
||||||
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0);
|
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
|
||||||
|
|
||||||
String metricField = "metric_field";
|
String metricField = "metric_field";
|
||||||
String valueField = "value_field";
|
String valueField = "value_field";
|
||||||
|
|
|
@ -20,11 +20,11 @@ import org.elasticsearch.search.aggregations.Aggregations;
|
||||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||||
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
|
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||||
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
|
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
|
||||||
import org.elasticsearch.xpack.core.rollup.RollupField;
|
import org.elasticsearch.xpack.core.rollup.RollupField;
|
||||||
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
|
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
|
||||||
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
||||||
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
|
||||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
|
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -228,6 +228,10 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||||
ESTestCase.awaitBusy(() -> indexer.getState() == IndexerState.STARTED);
|
ESTestCase.awaitBusy(() -> indexer.getState() == IndexerState.STARTED);
|
||||||
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
||||||
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
||||||
|
assertThat(indexer.getStats().getIndexFailures(), equalTo(0L));
|
||||||
|
assertThat(indexer.getStats().getSearchFailures(), equalTo(0L));
|
||||||
|
assertThat(indexer.getStats().getSearchTotal(), equalTo(1L));
|
||||||
|
assertThat(indexer.getStats().getIndexTotal(), equalTo(0L));
|
||||||
assertTrue(indexer.abort());
|
assertTrue(indexer.abort());
|
||||||
} finally {
|
} finally {
|
||||||
executor.shutdownNow();
|
executor.shutdownNow();
|
||||||
|
@ -257,6 +261,9 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||||
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
|
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
|
||||||
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
||||||
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
||||||
|
assertThat(indexer.getStats().getIndexFailures(), equalTo(0L));
|
||||||
|
assertThat(indexer.getStats().getSearchFailures(), equalTo(0L));
|
||||||
|
assertThat(indexer.getStats().getSearchTotal(), equalTo(1L));
|
||||||
assertTrue(indexer.abort());
|
assertTrue(indexer.abort());
|
||||||
} finally {
|
} finally {
|
||||||
executor.shutdownNow();
|
executor.shutdownNow();
|
||||||
|
@ -339,6 +346,7 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||||
assertThat(indexer.getState(), equalTo(IndexerState.ABORTING));
|
assertThat(indexer.getState(), equalTo(IndexerState.ABORTING));
|
||||||
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
||||||
assertThat(indexer.getStats().getNumPages(), equalTo(0L));
|
assertThat(indexer.getStats().getNumPages(), equalTo(0L));
|
||||||
|
assertThat(indexer.getStats().getSearchFailures(), equalTo(0L));
|
||||||
} finally {
|
} finally {
|
||||||
executor.shutdownNow();
|
executor.shutdownNow();
|
||||||
}
|
}
|
||||||
|
@ -638,6 +646,9 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||||
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
||||||
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
||||||
|
|
||||||
|
// There should be one recorded failure
|
||||||
|
assertThat(indexer.getStats().getSearchFailures(), equalTo(1L));
|
||||||
|
|
||||||
// Note: no docs were indexed
|
// Note: no docs were indexed
|
||||||
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
||||||
assertTrue(indexer.abort());
|
assertTrue(indexer.abort());
|
||||||
|
@ -742,6 +753,9 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||||
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
||||||
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
||||||
|
|
||||||
|
// There should be one recorded failure
|
||||||
|
assertThat(indexer.getStats().getSearchFailures(), equalTo(1L));
|
||||||
|
|
||||||
// Note: no docs were indexed
|
// Note: no docs were indexed
|
||||||
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
||||||
assertTrue(indexer.abort());
|
assertTrue(indexer.abort());
|
||||||
|
@ -784,6 +798,9 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||||
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
|
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
|
||||||
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
||||||
|
|
||||||
|
// There should be one recorded failure
|
||||||
|
assertThat(indexer.getStats().getSearchFailures(), equalTo(1L));
|
||||||
|
|
||||||
// Note: no pages processed, no docs were indexed
|
// Note: no pages processed, no docs were indexed
|
||||||
assertThat(indexer.getStats().getNumPages(), equalTo(0L));
|
assertThat(indexer.getStats().getNumPages(), equalTo(0L));
|
||||||
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
||||||
|
@ -895,6 +912,9 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||||
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
||||||
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
||||||
|
|
||||||
|
// There should be one recorded failure
|
||||||
|
assertThat(indexer.getStats().getIndexFailures(), equalTo(1L));
|
||||||
|
|
||||||
// Note: no docs were indexed
|
// Note: no docs were indexed
|
||||||
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
||||||
assertTrue(indexer.abort());
|
assertTrue(indexer.abort());
|
||||||
|
|
|
@ -70,6 +70,12 @@ setup:
|
||||||
documents_processed: 0
|
documents_processed: 0
|
||||||
rollups_indexed: 0
|
rollups_indexed: 0
|
||||||
trigger_count: 0
|
trigger_count: 0
|
||||||
|
search_failures: 0
|
||||||
|
index_failures: 0
|
||||||
|
index_time_in_ms: 0
|
||||||
|
index_total: 0
|
||||||
|
search_time_in_ms: 0
|
||||||
|
search_total: 0
|
||||||
status:
|
status:
|
||||||
job_state: "stopped"
|
job_state: "stopped"
|
||||||
upgraded_doc_id: true
|
upgraded_doc_id: true
|
||||||
|
@ -117,6 +123,12 @@ setup:
|
||||||
documents_processed: 0
|
documents_processed: 0
|
||||||
rollups_indexed: 0
|
rollups_indexed: 0
|
||||||
trigger_count: 0
|
trigger_count: 0
|
||||||
|
search_failures: 0
|
||||||
|
index_failures: 0
|
||||||
|
index_time_in_ms: 0
|
||||||
|
index_total: 0
|
||||||
|
search_time_in_ms: 0
|
||||||
|
search_total: 0
|
||||||
status:
|
status:
|
||||||
job_state: "stopped"
|
job_state: "stopped"
|
||||||
upgraded_doc_id: true
|
upgraded_doc_id: true
|
||||||
|
@ -164,6 +176,12 @@ setup:
|
||||||
documents_processed: 0
|
documents_processed: 0
|
||||||
rollups_indexed: 0
|
rollups_indexed: 0
|
||||||
trigger_count: 0
|
trigger_count: 0
|
||||||
|
search_failures: 0
|
||||||
|
index_failures: 0
|
||||||
|
index_time_in_ms: 0
|
||||||
|
index_total: 0
|
||||||
|
search_time_in_ms: 0
|
||||||
|
search_total: 0
|
||||||
status:
|
status:
|
||||||
job_state: "stopped"
|
job_state: "stopped"
|
||||||
upgraded_doc_id: true
|
upgraded_doc_id: true
|
||||||
|
|
|
@ -71,6 +71,12 @@ setup:
|
||||||
documents_processed: 0
|
documents_processed: 0
|
||||||
rollups_indexed: 0
|
rollups_indexed: 0
|
||||||
trigger_count: 0
|
trigger_count: 0
|
||||||
|
search_failures: 0
|
||||||
|
index_failures: 0
|
||||||
|
index_time_in_ms: 0
|
||||||
|
index_total: 0
|
||||||
|
search_time_in_ms: 0
|
||||||
|
search_total: 0
|
||||||
status:
|
status:
|
||||||
job_state: "stopped"
|
job_state: "stopped"
|
||||||
upgraded_doc_id: true
|
upgraded_doc_id: true
|
||||||
|
@ -208,6 +214,12 @@ setup:
|
||||||
documents_processed: 0
|
documents_processed: 0
|
||||||
rollups_indexed: 0
|
rollups_indexed: 0
|
||||||
trigger_count: 0
|
trigger_count: 0
|
||||||
|
search_failures: 0
|
||||||
|
index_failures: 0
|
||||||
|
index_time_in_ms: 0
|
||||||
|
index_total: 0
|
||||||
|
search_time_in_ms: 0
|
||||||
|
search_total: 0
|
||||||
status:
|
status:
|
||||||
job_state: "stopped"
|
job_state: "stopped"
|
||||||
upgraded_doc_id: true
|
upgraded_doc_id: true
|
||||||
|
|
|
@ -71,6 +71,12 @@ setup:
|
||||||
documents_processed: 0
|
documents_processed: 0
|
||||||
rollups_indexed: 0
|
rollups_indexed: 0
|
||||||
trigger_count: 0
|
trigger_count: 0
|
||||||
|
search_failures: 0
|
||||||
|
index_failures: 0
|
||||||
|
index_time_in_ms: 0
|
||||||
|
index_total: 0
|
||||||
|
search_time_in_ms: 0
|
||||||
|
search_total: 0
|
||||||
status:
|
status:
|
||||||
job_state: "stopped"
|
job_state: "stopped"
|
||||||
upgraded_doc_id: true
|
upgraded_doc_id: true
|
||||||
|
|
Loading…
Reference in New Issue