mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 21:05:23 +00:00
[Rollup] Re-factor Rollup Indexer into a generic indexer for re-usability (#32743)
This extracts a super class out of the rollup indexer called the AsyncTwoPhaseIterator. The implementor of it can define the query, transformation of the response, indexing and the object to persist the position/state of the indexer. The stats object used by the indexer to record progress is also now abstract, allowing the implementation provide custom stats beyond what the indexer provides. It also allows the implementation to decide how the stats are presented (leaves toXContent() up to the implementation). This should allow new projects to reuse the search-then-index persistent task that Rollup uses, but without the restrictions/baggage of how Rollup has to work internally to satisfy time-based rollups.
This commit is contained in:
parent
b52818ec6f
commit
cfc003d485
@ -0,0 +1,385 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.indexing;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
|
||||
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
|
||||
* Only one background job can run simultaneously and {@link #onFinish()} is called when the job
|
||||
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
|
||||
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
|
||||
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
|
||||
*
|
||||
* In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query,
|
||||
* indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input.
|
||||
*
|
||||
* @param <JobPosition> Type that defines a job position to be defined by the implementation.
|
||||
*/
|
||||
public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends IndexerJobStats> {
|
||||
private static final Logger logger = Logger.getLogger(AsyncTwoPhaseIndexer.class.getName());
|
||||
|
||||
private final JobStats stats;
|
||||
|
||||
private final AtomicReference<IndexerState> state;
|
||||
private final AtomicReference<JobPosition> position;
|
||||
private final Executor executor;
|
||||
|
||||
protected AsyncTwoPhaseIndexer(Executor executor, AtomicReference<IndexerState> initialState,
|
||||
JobPosition initialPosition, JobStats jobStats) {
|
||||
this.executor = executor;
|
||||
this.state = initialState;
|
||||
this.position = new AtomicReference<>(initialPosition);
|
||||
this.stats = jobStats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current state of the indexer.
|
||||
*/
|
||||
public IndexerState getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current position of the indexer.
|
||||
*/
|
||||
public JobPosition getPosition() {
|
||||
return position.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the stats of this indexer.
|
||||
*/
|
||||
public JobStats getStats() {
|
||||
return stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the internal state to {@link IndexerState#STARTED} if the previous state
|
||||
* was {@link IndexerState#STOPPED}. Setting the state to STARTED allows a job
|
||||
* to run in the background when {@link #maybeTriggerAsyncJob(long)} is called.
|
||||
*
|
||||
* @return The new state for the indexer (STARTED, INDEXING or ABORTING if the
|
||||
* job was already aborted).
|
||||
*/
|
||||
public synchronized IndexerState start() {
|
||||
state.compareAndSet(IndexerState.STOPPED, IndexerState.STARTED);
|
||||
return state.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
|
||||
* running in the background and in such case {@link #onFinish()} will be called
|
||||
* as soon as the background job detects that the indexer is stopped. If there
|
||||
* is no job running when this function is called, the state is directly set to
|
||||
* {@link IndexerState#STOPPED} and {@link #onFinish()} will never be called.
|
||||
*
|
||||
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the
|
||||
* job was already aborted).
|
||||
*/
|
||||
public synchronized IndexerState stop() {
|
||||
IndexerState currentState = state.updateAndGet(previousState -> {
|
||||
if (previousState == IndexerState.INDEXING) {
|
||||
return IndexerState.STOPPING;
|
||||
} else if (previousState == IndexerState.STARTED) {
|
||||
return IndexerState.STOPPED;
|
||||
} else {
|
||||
return previousState;
|
||||
}
|
||||
});
|
||||
return currentState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the internal state to {@link IndexerState#ABORTING}. It returns false if
|
||||
* an async job is running in the background and in such case {@link #onAbort}
|
||||
* will be called as soon as the background job detects that the indexer is
|
||||
* aborted. If there is no job running when this function is called, it returns
|
||||
* true and {@link #onAbort()} will never be called.
|
||||
*
|
||||
* @return true if the indexer is aborted, false if a background job is running
|
||||
* and abort is delayed.
|
||||
*/
|
||||
public synchronized boolean abort() {
|
||||
IndexerState prevState = state.getAndUpdate((prev) -> IndexerState.ABORTING);
|
||||
return prevState == IndexerState.STOPPED || prevState == IndexerState.STARTED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Triggers a background job that builds the index asynchronously iff
|
||||
* there is no other job that runs and the indexer is started
|
||||
* ({@link IndexerState#STARTED}.
|
||||
*
|
||||
* @param now
|
||||
* The current time in milliseconds (used to limit the job to
|
||||
* complete buckets)
|
||||
* @return true if a job has been triggered, false otherwise
|
||||
*/
|
||||
public synchronized boolean maybeTriggerAsyncJob(long now) {
|
||||
final IndexerState currentState = state.get();
|
||||
switch (currentState) {
|
||||
case INDEXING:
|
||||
case STOPPING:
|
||||
case ABORTING:
|
||||
logger.warn("Schedule was triggered for job [" + getJobId() + "], but prior indexer is still running.");
|
||||
return false;
|
||||
|
||||
case STOPPED:
|
||||
logger.debug("Schedule was triggered for job [" + getJobId() + "] but job is stopped. Ignoring trigger.");
|
||||
return false;
|
||||
|
||||
case STARTED:
|
||||
logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]");
|
||||
stats.incrementNumInvocations(1);
|
||||
onStartJob(now);
|
||||
|
||||
if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
|
||||
// fire off the search. Note this is async, the method will return from here
|
||||
executor.execute(() -> doNextSearch(buildSearchRequest(),
|
||||
ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc))));
|
||||
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
|
||||
return true;
|
||||
} else {
|
||||
logger.debug("Could not move from STARTED to INDEXING state because current state is [" + state.get() + "]");
|
||||
return false;
|
||||
}
|
||||
|
||||
default:
|
||||
logger.warn("Encountered unexpected state [" + currentState + "] while indexing");
|
||||
throw new IllegalStateException("Job encountered an illegal state [" + currentState + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called to get the Id of the job, used for logging.
|
||||
*
|
||||
* @return a string with the id of the job
|
||||
*/
|
||||
protected abstract String getJobId();
|
||||
|
||||
/**
|
||||
* Called to process a response from the 1 search request in order to turn it into a {@link IterationResult}.
|
||||
*
|
||||
* @param searchResponse response from the search phase.
|
||||
* @return Iteration object to be passed to indexing phase.
|
||||
*/
|
||||
protected abstract IterationResult<JobPosition> doProcess(SearchResponse searchResponse);
|
||||
|
||||
/**
|
||||
* Called to build the next search request.
|
||||
*
|
||||
* @return SearchRequest to be passed to the search phase.
|
||||
*/
|
||||
protected abstract SearchRequest buildSearchRequest();
|
||||
|
||||
/**
|
||||
* Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the
|
||||
* internal state is {@link IndexerState#STARTED}.
|
||||
*
|
||||
* @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)}
|
||||
*/
|
||||
protected abstract void onStartJob(long now);
|
||||
|
||||
/**
|
||||
* Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the
|
||||
* response or the exception if an error occurs.
|
||||
*
|
||||
* @param request
|
||||
* The search request to execute
|
||||
* @param nextPhase
|
||||
* Listener for the next phase
|
||||
*/
|
||||
protected abstract void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase);
|
||||
|
||||
/**
|
||||
* Executes the {@link BulkRequest} and calls <code>nextPhase</code> with the
|
||||
* response or the exception if an error occurs.
|
||||
*
|
||||
* @param request
|
||||
* The bulk request to execute
|
||||
* @param nextPhase
|
||||
* Listener for the next phase
|
||||
*/
|
||||
protected abstract void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase);
|
||||
|
||||
/**
|
||||
* Called periodically during the execution of a background job. Implementation
|
||||
* should persists the state somewhere and continue the execution asynchronously
|
||||
* using <code>next</code>.
|
||||
*
|
||||
* @param state
|
||||
* The current state of the indexer
|
||||
* @param position
|
||||
* The current position of the indexer
|
||||
* @param next
|
||||
* Runnable for the next phase
|
||||
*/
|
||||
protected abstract void doSaveState(IndexerState state, JobPosition position, Runnable next);
|
||||
|
||||
/**
|
||||
* Called when a failure occurs in an async job causing the execution to stop.
|
||||
*
|
||||
* @param exc
|
||||
* The exception
|
||||
*/
|
||||
protected abstract void onFailure(Exception exc);
|
||||
|
||||
/**
|
||||
* Called when a background job finishes.
|
||||
*/
|
||||
protected abstract void onFinish();
|
||||
|
||||
/**
|
||||
* Called when a background job detects that the indexer is aborted causing the
|
||||
* async execution to stop.
|
||||
*/
|
||||
protected abstract void onAbort();
|
||||
|
||||
private void finishWithFailure(Exception exc) {
|
||||
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
|
||||
}
|
||||
|
||||
private IndexerState finishAndSetState() {
|
||||
return state.updateAndGet(prev -> {
|
||||
switch (prev) {
|
||||
case INDEXING:
|
||||
// ready for another job
|
||||
return IndexerState.STARTED;
|
||||
|
||||
case STOPPING:
|
||||
// must be started again
|
||||
return IndexerState.STOPPED;
|
||||
|
||||
case ABORTING:
|
||||
// abort and exit
|
||||
onAbort();
|
||||
return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first
|
||||
|
||||
case STOPPED:
|
||||
// No-op. Shouldn't really be possible to get here (should have to go through
|
||||
// STOPPING
|
||||
// first which will be handled) but is harmless to no-op and we don't want to
|
||||
// throw exception here
|
||||
return IndexerState.STOPPED;
|
||||
|
||||
default:
|
||||
// any other state is unanticipated at this point
|
||||
throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void onSearchResponse(SearchResponse searchResponse) {
|
||||
try {
|
||||
if (checkState(getState()) == false) {
|
||||
return;
|
||||
}
|
||||
if (searchResponse.getShardFailures().length != 0) {
|
||||
throw new RuntimeException("Shard failures encountered while running indexer for job [" + getJobId() + "]: "
|
||||
+ Arrays.toString(searchResponse.getShardFailures()));
|
||||
}
|
||||
|
||||
stats.incrementNumPages(1);
|
||||
IterationResult<JobPosition> iterationResult = doProcess(searchResponse);
|
||||
|
||||
if (iterationResult.isDone()) {
|
||||
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
|
||||
|
||||
// Change state first, then try to persist. This prevents in-progress
|
||||
// STOPPING/ABORTING from
|
||||
// being persisted as STARTED but then stop the job
|
||||
doSaveState(finishAndSetState(), position.get(), this::onFinish);
|
||||
return;
|
||||
}
|
||||
|
||||
final List<IndexRequest> docs = iterationResult.getToIndex();
|
||||
final BulkRequest bulkRequest = new BulkRequest();
|
||||
docs.forEach(bulkRequest::add);
|
||||
|
||||
// TODO this might be a valid case, e.g. if implementation filters
|
||||
assert bulkRequest.requests().size() > 0;
|
||||
|
||||
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
|
||||
// TODO we should check items in the response and move after accordingly to
|
||||
// resume the failing buckets ?
|
||||
if (bulkResponse.hasFailures()) {
|
||||
logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
|
||||
}
|
||||
stats.incrementNumOutputDocuments(bulkResponse.getItems().length);
|
||||
if (checkState(getState()) == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
JobPosition newPosition = iterationResult.getPosition();
|
||||
position.set(newPosition);
|
||||
|
||||
onBulkResponse(bulkResponse, newPosition);
|
||||
}, exc -> finishWithFailure(exc)));
|
||||
} catch (Exception e) {
|
||||
finishWithFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void onBulkResponse(BulkResponse response, JobPosition position) {
|
||||
try {
|
||||
|
||||
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithFailure);
|
||||
// TODO probably something more intelligent than every-50 is needed
|
||||
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
|
||||
doSaveState(IndexerState.INDEXING, position, () -> doNextSearch(buildSearchRequest(), listener));
|
||||
} else {
|
||||
doNextSearch(buildSearchRequest(), listener);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
finishWithFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the {@link IndexerState} and returns false if the execution should be
|
||||
* stopped.
|
||||
*/
|
||||
private boolean checkState(IndexerState currentState) {
|
||||
switch (currentState) {
|
||||
case INDEXING:
|
||||
// normal state;
|
||||
return true;
|
||||
|
||||
case STOPPING:
|
||||
logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer.");
|
||||
doSaveState(finishAndSetState(), getPosition(), () -> {
|
||||
});
|
||||
return false;
|
||||
|
||||
case STOPPED:
|
||||
return false;
|
||||
|
||||
case ABORTING:
|
||||
logger.info("Requested shutdown of indexer for job [" + getJobId() + "]");
|
||||
onAbort();
|
||||
return false;
|
||||
|
||||
default:
|
||||
// Anything other than indexing, aborting or stopping is unanticipated
|
||||
logger.warn("Encountered unexpected state [" + currentState + "] while indexing");
|
||||
throw new IllegalStateException("Indexer job encountered an illegal state [" + currentState + "]");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,114 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexing;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* This class holds the runtime statistics of a job. The stats are not used by any internal process
|
||||
* and are only for external monitoring/reference. Statistics are not persisted with the job, so if the
|
||||
* allocated task is shutdown/restarted on a different node all the stats will reset.
|
||||
*/
|
||||
public abstract class IndexerJobStats implements ToXContentObject, Writeable {
|
||||
|
||||
public static final ParseField NAME = new ParseField("job_stats");
|
||||
|
||||
protected long numPages = 0;
|
||||
protected long numInputDocuments = 0;
|
||||
protected long numOuputDocuments = 0;
|
||||
protected long numInvocations = 0;
|
||||
|
||||
public IndexerJobStats() {
|
||||
}
|
||||
|
||||
public IndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
|
||||
this.numPages = numPages;
|
||||
this.numInputDocuments = numInputDocuments;
|
||||
this.numOuputDocuments = numOuputDocuments;
|
||||
this.numInvocations = numInvocations;
|
||||
}
|
||||
|
||||
public IndexerJobStats(StreamInput in) throws IOException {
|
||||
this.numPages = in.readVLong();
|
||||
this.numInputDocuments = in.readVLong();
|
||||
this.numOuputDocuments = in.readVLong();
|
||||
this.numInvocations = in.readVLong();
|
||||
}
|
||||
|
||||
public long getNumPages() {
|
||||
return numPages;
|
||||
}
|
||||
|
||||
public long getNumDocuments() {
|
||||
return numInputDocuments;
|
||||
}
|
||||
|
||||
public long getNumInvocations() {
|
||||
return numInvocations;
|
||||
}
|
||||
|
||||
public long getOutputDocuments() {
|
||||
return numOuputDocuments;
|
||||
}
|
||||
|
||||
public void incrementNumPages(long n) {
|
||||
assert(n >= 0);
|
||||
numPages += n;
|
||||
}
|
||||
|
||||
public void incrementNumDocuments(long n) {
|
||||
assert(n >= 0);
|
||||
numInputDocuments += n;
|
||||
}
|
||||
|
||||
public void incrementNumInvocations(long n) {
|
||||
assert(n >= 0);
|
||||
numInvocations += n;
|
||||
}
|
||||
|
||||
public void incrementNumOutputDocuments(long n) {
|
||||
assert(n >= 0);
|
||||
numOuputDocuments += n;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(numPages);
|
||||
out.writeVLong(numInputDocuments);
|
||||
out.writeVLong(numOuputDocuments);
|
||||
out.writeVLong(numInvocations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (other == null || getClass() != other.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
IndexerJobStats that = (IndexerJobStats) other;
|
||||
|
||||
return Objects.equals(this.numPages, that.numPages)
|
||||
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
|
||||
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
|
||||
&& Objects.equals(this.numInvocations, that.numInvocations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
|
||||
}
|
||||
}
|
@ -3,7 +3,7 @@
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.rollup.job;
|
||||
package org.elasticsearch.xpack.core.indexing;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.indexing;
|
||||
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Result object to hold the result of 1 iteration of iterative indexing.
|
||||
* Acts as an interface between the implementation and the generic indexer.
|
||||
*/
|
||||
public class IterationResult<JobPosition> {
|
||||
|
||||
private final boolean isDone;
|
||||
private final JobPosition position;
|
||||
private final List<IndexRequest> toIndex;
|
||||
|
||||
/**
|
||||
* Constructor for the result of 1 iteration.
|
||||
*
|
||||
* @param toIndex the list of requests to be indexed
|
||||
* @param position the extracted, persistable position of the job required for the search phase
|
||||
* @param isDone true if source is exhausted and job should go to sleep
|
||||
*
|
||||
* Note: toIndex.empty() != isDone due to possible filtering in the specific implementation
|
||||
*/
|
||||
public IterationResult(List<IndexRequest> toIndex, JobPosition position, boolean isDone) {
|
||||
this.toIndex = toIndex;
|
||||
this.position = position;
|
||||
this.isDone = isDone;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this indexing iteration is done and job should go into sleep mode.
|
||||
*/
|
||||
public boolean isDone() {
|
||||
return isDone;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the position of the job, a generic to be passed to the next query construction.
|
||||
*
|
||||
* @return the position
|
||||
*/
|
||||
public JobPosition getPosition() {
|
||||
return position;
|
||||
}
|
||||
|
||||
/**
|
||||
* List of requests to be passed to bulk indexing.
|
||||
*
|
||||
* @return List of index requests.
|
||||
*/
|
||||
public List<IndexRequest> getToIndex() {
|
||||
return toIndex;
|
||||
}
|
||||
}
|
@ -26,8 +26,8 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.xpack.core.rollup.RollupField;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupIndexerJobStats;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobStats;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -174,7 +174,14 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(JOBS.getPreferredName(), jobs);
|
||||
|
||||
// XContentBuilder does not support passing the params object for Iterables
|
||||
builder.field(JOBS.getPreferredName());
|
||||
builder.startArray();
|
||||
for (JobWrapper job : jobs) {
|
||||
job.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
@ -204,20 +211,20 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
|
||||
|
||||
public static class JobWrapper implements Writeable, ToXContentObject {
|
||||
private final RollupJobConfig job;
|
||||
private final RollupJobStats stats;
|
||||
private final RollupIndexerJobStats stats;
|
||||
private final RollupJobStatus status;
|
||||
|
||||
public static final ConstructingObjectParser<JobWrapper, Void> PARSER
|
||||
= new ConstructingObjectParser<>(NAME, a -> new JobWrapper((RollupJobConfig) a[0],
|
||||
(RollupJobStats) a[1], (RollupJobStatus)a[2]));
|
||||
(RollupIndexerJobStats) a[1], (RollupJobStatus)a[2]));
|
||||
|
||||
static {
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> RollupJobConfig.fromXContent(p, null), CONFIG);
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), RollupJobStats.PARSER::apply, STATS);
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), RollupIndexerJobStats.PARSER::apply, STATS);
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), RollupJobStatus.PARSER::apply, STATUS);
|
||||
}
|
||||
|
||||
public JobWrapper(RollupJobConfig job, RollupJobStats stats, RollupJobStatus status) {
|
||||
public JobWrapper(RollupJobConfig job, RollupIndexerJobStats stats, RollupJobStatus status) {
|
||||
this.job = job;
|
||||
this.stats = stats;
|
||||
this.status = status;
|
||||
@ -225,7 +232,7 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
|
||||
|
||||
public JobWrapper(StreamInput in) throws IOException {
|
||||
this.job = new RollupJobConfig(in);
|
||||
this.stats = new RollupJobStats(in);
|
||||
this.stats = new RollupIndexerJobStats(in);
|
||||
this.status = new RollupJobStatus(in);
|
||||
}
|
||||
|
||||
@ -240,7 +247,7 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
|
||||
return job;
|
||||
}
|
||||
|
||||
public RollupJobStats getStats() {
|
||||
public RollupIndexerJobStats getStats() {
|
||||
return stats;
|
||||
}
|
||||
|
||||
@ -254,7 +261,7 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
|
||||
builder.field(CONFIG.getPreferredName());
|
||||
job.toXContent(builder, params);
|
||||
builder.field(STATUS.getPreferredName(), status);
|
||||
builder.field(STATS.getPreferredName(), stats);
|
||||
builder.field(STATS.getPreferredName(), stats, params);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.rollup.job;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.indexing.IndexerJobStats;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||
|
||||
/**
|
||||
* The Rollup specialization of stats for the AsyncTwoPhaseIndexer.
|
||||
* Note: instead of `documents_indexed`, this XContent show `rollups_indexed`
|
||||
*/
|
||||
public class RollupIndexerJobStats extends IndexerJobStats {
|
||||
private static ParseField NUM_PAGES = new ParseField("pages_processed");
|
||||
private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
|
||||
private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("rollups_indexed");
|
||||
private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
|
||||
|
||||
public static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER =
|
||||
new ConstructingObjectParser<>(NAME.getPreferredName(),
|
||||
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
|
||||
|
||||
static {
|
||||
PARSER.declareLong(constructorArg(), NUM_PAGES);
|
||||
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
|
||||
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
|
||||
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
|
||||
}
|
||||
|
||||
public RollupIndexerJobStats() {
|
||||
super();
|
||||
}
|
||||
|
||||
public RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
|
||||
super(numPages, numInputDocuments, numOuputDocuments, numInvocations);
|
||||
}
|
||||
|
||||
public RollupIndexerJobStats(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(NUM_PAGES.getPreferredName(), numPages);
|
||||
builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments);
|
||||
builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments);
|
||||
builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static RollupIndexerJobStats fromXContent(XContentParser parser) {
|
||||
try {
|
||||
return PARSER.parse(parser, null);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,156 +0,0 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.rollup.job;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||
|
||||
/**
|
||||
* This class holds the runtime statistics of a job. The stats are not used by any internal process
|
||||
* and are only for external monitoring/reference. Statistics are not persisted with the job, so if the
|
||||
* allocated task is shutdown/restarted on a different node all the stats will reset.
|
||||
*/
|
||||
public class RollupJobStats implements ToXContentObject, Writeable {
|
||||
|
||||
public static final ParseField NAME = new ParseField("job_stats");
|
||||
|
||||
private static ParseField NUM_PAGES = new ParseField("pages_processed");
|
||||
private static ParseField NUM_DOCUMENTS = new ParseField("documents_processed");
|
||||
private static ParseField NUM_ROLLUPS = new ParseField("rollups_indexed");
|
||||
private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
|
||||
|
||||
private long numPages = 0;
|
||||
private long numDocuments = 0;
|
||||
private long numRollups = 0;
|
||||
private long numInvocations = 0;
|
||||
|
||||
public static final ConstructingObjectParser<RollupJobStats, Void> PARSER =
|
||||
new ConstructingObjectParser<>(NAME.getPreferredName(),
|
||||
args -> new RollupJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
|
||||
|
||||
static {
|
||||
PARSER.declareLong(constructorArg(), NUM_PAGES);
|
||||
PARSER.declareLong(constructorArg(), NUM_DOCUMENTS);
|
||||
PARSER.declareLong(constructorArg(), NUM_ROLLUPS);
|
||||
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
|
||||
}
|
||||
|
||||
public RollupJobStats() {
|
||||
}
|
||||
|
||||
public RollupJobStats(long numPages, long numDocuments, long numRollups, long numInvocations) {
|
||||
this.numPages = numPages;
|
||||
this.numDocuments = numDocuments;
|
||||
this.numRollups = numRollups;
|
||||
this.numInvocations = numInvocations;
|
||||
}
|
||||
|
||||
public RollupJobStats(StreamInput in) throws IOException {
|
||||
this.numPages = in.readVLong();
|
||||
this.numDocuments = in.readVLong();
|
||||
this.numRollups = in.readVLong();
|
||||
this.numInvocations = in.readVLong();
|
||||
}
|
||||
|
||||
public long getNumPages() {
|
||||
return numPages;
|
||||
}
|
||||
|
||||
public long getNumDocuments() {
|
||||
return numDocuments;
|
||||
}
|
||||
|
||||
public long getNumInvocations() {
|
||||
return numInvocations;
|
||||
}
|
||||
|
||||
public long getNumRollups() {
|
||||
return numRollups;
|
||||
}
|
||||
|
||||
public void incrementNumPages(long n) {
|
||||
assert(n >= 0);
|
||||
numPages += n;
|
||||
}
|
||||
|
||||
public void incrementNumDocuments(long n) {
|
||||
assert(n >= 0);
|
||||
numDocuments += n;
|
||||
}
|
||||
|
||||
public void incrementNumInvocations(long n) {
|
||||
assert(n >= 0);
|
||||
numInvocations += n;
|
||||
}
|
||||
|
||||
public void incrementNumRollups(long n) {
|
||||
assert(n >= 0);
|
||||
numRollups += n;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(numPages);
|
||||
out.writeVLong(numDocuments);
|
||||
out.writeVLong(numRollups);
|
||||
out.writeVLong(numInvocations);
|
||||
}
|
||||
|
||||
public static RollupJobStats fromXContent(XContentParser parser) {
|
||||
try {
|
||||
return PARSER.parse(parser, null);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(NUM_PAGES.getPreferredName(), numPages);
|
||||
builder.field(NUM_DOCUMENTS.getPreferredName(), numDocuments);
|
||||
builder.field(NUM_ROLLUPS.getPreferredName(), numRollups);
|
||||
builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (other == null || getClass() != other.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
RollupJobStats that = (RollupJobStats) other;
|
||||
|
||||
return Objects.equals(this.numPages, that.numPages)
|
||||
&& Objects.equals(this.numDocuments, that.numDocuments)
|
||||
&& Objects.equals(this.numRollups, that.numRollups)
|
||||
&& Objects.equals(this.numInvocations, that.numInvocations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(numPages, numDocuments, numRollups, numInvocations);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
@ -0,0 +1,143 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.indexing;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchResponseSections;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class AsyncTwoPhaseIndexerTests extends ESTestCase {
|
||||
|
||||
AtomicBoolean isFinished = new AtomicBoolean(false);
|
||||
|
||||
private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
|
||||
|
||||
// test the execution order
|
||||
private int step;
|
||||
|
||||
protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition) {
|
||||
super(executor, initialState, initialPosition, new MockJobStats());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getJobId() {
|
||||
return "mock";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
|
||||
assertThat(step, equalTo(3));
|
||||
++step;
|
||||
return new IterationResult<Integer>(Collections.emptyList(), 3, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SearchRequest buildSearchRequest() {
|
||||
assertThat(step, equalTo(1));
|
||||
++step;
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onStartJob(long now) {
|
||||
assertThat(step, equalTo(0));
|
||||
++step;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
|
||||
assertThat(step, equalTo(2));
|
||||
++step;
|
||||
final SearchResponseSections sections = new SearchResponseSections(new SearchHits(new SearchHit[0], 0, 0), null, null, false,
|
||||
null, null, 1);
|
||||
|
||||
nextPhase.onResponse(new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
|
||||
fail("should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
|
||||
assertThat(step, equalTo(4));
|
||||
++step;
|
||||
next.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onFailure(Exception exc) {
|
||||
fail(exc.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onFinish() {
|
||||
assertThat(step, equalTo(5));
|
||||
++step;
|
||||
isFinished.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onAbort() {
|
||||
}
|
||||
|
||||
public int getStep() {
|
||||
return step;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class MockJobStats extends IndexerJobStats {
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void testStateMachine() throws InterruptedException {
|
||||
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
||||
final ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
|
||||
try {
|
||||
|
||||
MockIndexer indexer = new MockIndexer(executor, state, 2);
|
||||
indexer.start();
|
||||
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
|
||||
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
|
||||
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
|
||||
assertThat(indexer.getPosition(), equalTo(2));
|
||||
ESTestCase.awaitBusy(() -> isFinished.get());
|
||||
assertThat(indexer.getStep(), equalTo(6));
|
||||
assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
|
||||
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
||||
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
||||
assertTrue(indexer.abort());
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
@ -3,7 +3,7 @@
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.rollup.job;
|
||||
package org.elasticsearch.xpack.core.indexing;
|
||||
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.rollup.job;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
|
||||
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction;
|
||||
|
||||
@ -40,7 +41,8 @@ public class JobWrapperSerializingTests extends AbstractSerializingTestCase<GetR
|
||||
}
|
||||
|
||||
return new GetRollupJobsAction.JobWrapper(ConfigTestHelpers.randomRollupJobConfig(random()),
|
||||
new RollupJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()),
|
||||
new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
|
||||
randomNonNegativeLong(), randomNonNegativeLong()),
|
||||
new RollupJobStatus(state, Collections.emptyMap(), randomBoolean()));
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.rollup.job;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
|
||||
public class RollupIndexerJobStatsTests extends AbstractSerializingTestCase<RollupIndexerJobStats> {
|
||||
|
||||
@Override
|
||||
protected RollupIndexerJobStats createTestInstance() {
|
||||
return randomStats();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<RollupIndexerJobStats> instanceReader() {
|
||||
return RollupIndexerJobStats::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RollupIndexerJobStats doParseInstance(XContentParser parser) {
|
||||
return RollupIndexerJobStats.fromXContent(parser);
|
||||
}
|
||||
|
||||
public static RollupIndexerJobStats randomStats() {
|
||||
return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
|
||||
randomNonNegativeLong(), randomNonNegativeLong());
|
||||
}
|
||||
|
||||
}
|
@ -1,35 +0,0 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.rollup.job;
|
||||
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobStats;
|
||||
|
||||
public class RollupJobStatsTests extends AbstractSerializingTestCase<RollupJobStats> {
|
||||
|
||||
@Override
|
||||
protected RollupJobStats createTestInstance() {
|
||||
return randomStats();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<RollupJobStats> instanceReader() {
|
||||
return RollupJobStats::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RollupJobStats doParseInstance(XContentParser parser) {
|
||||
return RollupJobStats.fromXContent(parser);
|
||||
}
|
||||
|
||||
public static RollupJobStats randomStats() {
|
||||
return new RollupJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
|
||||
randomNonNegativeLong(), randomNonNegativeLong());
|
||||
}
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.rollup.job;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -17,7 +17,7 @@ import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggre
|
||||
import org.elasticsearch.xpack.core.rollup.RollupField;
|
||||
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobStats;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupIndexerJobStats;
|
||||
import org.elasticsearch.xpack.rollup.Rollup;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -46,7 +46,7 @@ class IndexerUtils {
|
||||
* @param isUpgradedDocID `true` if this job is using the new ID scheme
|
||||
* @return A list of rolled documents derived from the response
|
||||
*/
|
||||
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats,
|
||||
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats,
|
||||
GroupConfig groupConfig, String jobId, boolean isUpgradedDocID) {
|
||||
|
||||
logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]");
|
||||
|
@ -5,11 +5,6 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.rollup.job;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
@ -33,20 +28,22 @@ import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountAggreg
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||
import org.elasticsearch.xpack.core.indexing.IterationResult;
|
||||
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
|
||||
import org.elasticsearch.xpack.core.rollup.RollupField;
|
||||
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.HistogramGroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.IndexerState;
|
||||
import org.elasticsearch.xpack.core.rollup.job.MetricConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupIndexerJobStats;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobStats;
|
||||
import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -54,30 +51,16 @@ import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
import static org.elasticsearch.xpack.core.rollup.RollupField.formatFieldName;
|
||||
|
||||
/**
|
||||
* An abstract class that builds a rollup index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
|
||||
* it will create the rollup index from the source index up to the last complete bucket that is allowed to be built (based on the current
|
||||
* time and the delay set on the rollup job). Only one background job can run simultaneously and {@link #onFinish()} is called when the job
|
||||
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
|
||||
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
|
||||
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
|
||||
* An abstract implementation of {@link AsyncTwoPhaseIndexer} that builds a rollup index incrementally.
|
||||
*/
|
||||
public abstract class RollupIndexer {
|
||||
private static final Logger logger = Logger.getLogger(RollupIndexer.class.getName());
|
||||
|
||||
public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Object>, RollupIndexerJobStats> {
|
||||
static final String AGGREGATION_NAME = RollupField.NAME;
|
||||
|
||||
private final RollupJob job;
|
||||
private final RollupJobStats stats;
|
||||
private final AtomicReference<IndexerState> state;
|
||||
private final AtomicReference<Map<String, Object>> position;
|
||||
private final Executor executor;
|
||||
protected final AtomicBoolean upgradedDocumentID;
|
||||
|
||||
private final CompositeAggregationBuilder compositeBuilder;
|
||||
private long maxBoundary;
|
||||
|
||||
@ -87,84 +70,16 @@ public abstract class RollupIndexer {
|
||||
* @param job The rollup job
|
||||
* @param initialState Initial state for the indexer
|
||||
* @param initialPosition The last indexed bucket of the task
|
||||
* @param upgradedDocumentID whether job has updated IDs (for BWC)
|
||||
*/
|
||||
RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
|
||||
Map<String, Object> initialPosition, AtomicBoolean upgradedDocumentID) {
|
||||
this.executor = executor;
|
||||
RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition,
|
||||
AtomicBoolean upgradedDocumentID) {
|
||||
super(executor, initialState, initialPosition, new RollupIndexerJobStats());
|
||||
this.job = job;
|
||||
this.stats = new RollupJobStats();
|
||||
this.state = initialState;
|
||||
this.position = new AtomicReference<>(initialPosition);
|
||||
this.compositeBuilder = createCompositeBuilder(job.getConfig());
|
||||
this.upgradedDocumentID = upgradedDocumentID;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the response
|
||||
* or the exception if an error occurs.
|
||||
*
|
||||
* @param request The search request to execute
|
||||
* @param nextPhase Listener for the next phase
|
||||
*/
|
||||
protected abstract void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase);
|
||||
|
||||
/**
|
||||
* Executes the {@link BulkRequest} and calls <code>nextPhase</code> with the response
|
||||
* or the exception if an error occurs.
|
||||
*
|
||||
* @param request The bulk request to execute
|
||||
* @param nextPhase Listener for the next phase
|
||||
*/
|
||||
protected abstract void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase);
|
||||
|
||||
/**
|
||||
* Called periodically during the execution of a background job. Implementation should
|
||||
* persists the state somewhere and continue the execution asynchronously using <code>next</code>.
|
||||
*
|
||||
* @param state The current state of the indexer
|
||||
* @param position The current position of the indexer
|
||||
* @param next Runnable for the next phase
|
||||
*/
|
||||
protected abstract void doSaveState(IndexerState state, Map<String, Object> position, Runnable next);
|
||||
|
||||
/**
|
||||
* Called when a failure occurs in an async job causing the execution to stop.
|
||||
* @param exc The exception
|
||||
*/
|
||||
protected abstract void onFailure(Exception exc);
|
||||
|
||||
/**
|
||||
* Called when a background job finishes.
|
||||
*/
|
||||
protected abstract void onFinish();
|
||||
|
||||
/**
|
||||
* Called when a background job detects that the indexer is aborted causing the async execution
|
||||
* to stop.
|
||||
*/
|
||||
protected abstract void onAbort();
|
||||
|
||||
/**
|
||||
* Get the current state of the indexer.
|
||||
*/
|
||||
public IndexerState getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current position of the indexer.
|
||||
*/
|
||||
public Map<String, Object> getPosition() {
|
||||
return position.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the stats of this indexer.
|
||||
*/
|
||||
public RollupJobStats getStats() {
|
||||
return stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns if this job has upgraded it's ID scheme yet or not
|
||||
*/
|
||||
@ -172,229 +87,28 @@ public abstract class RollupIndexer {
|
||||
return upgradedDocumentID.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the internal state to {@link IndexerState#STARTED} if the previous state was {@link IndexerState#STOPPED}. Setting the state to
|
||||
* STARTED allows a job to run in the background when {@link #maybeTriggerAsyncJob(long)} is called.
|
||||
* @return The new state for the indexer (STARTED, INDEXING or ABORTING if the job was already aborted).
|
||||
*/
|
||||
public synchronized IndexerState start() {
|
||||
state.compareAndSet(IndexerState.STOPPED, IndexerState.STARTED);
|
||||
return state.get();
|
||||
@Override
|
||||
protected String getJobId() {
|
||||
return job.getConfig().getId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is running in the background and in such case
|
||||
* {@link #onFinish()} will be called as soon as the background job detects that the indexer is stopped. If there is no job running when
|
||||
* this function is called, the state is directly set to {@link IndexerState#STOPPED} and {@link #onFinish()} will never be called.
|
||||
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
|
||||
*/
|
||||
public synchronized IndexerState stop() {
|
||||
IndexerState currentState = state.updateAndGet(previousState -> {
|
||||
if (previousState == IndexerState.INDEXING) {
|
||||
return IndexerState.STOPPING;
|
||||
} else if (previousState == IndexerState.STARTED) {
|
||||
return IndexerState.STOPPED;
|
||||
} else {
|
||||
return previousState;
|
||||
}
|
||||
});
|
||||
return currentState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the internal state to {@link IndexerState#ABORTING}. It returns false if an async job is running in the background and in such
|
||||
* case {@link #onAbort} will be called as soon as the background job detects that the indexer is aborted. If there is no job running
|
||||
* when this function is called, it returns true and {@link #onAbort()} will never be called.
|
||||
* @return true if the indexer is aborted, false if a background job is running and abort is delayed.
|
||||
*/
|
||||
public synchronized boolean abort() {
|
||||
IndexerState prevState = state.getAndUpdate((prev) -> IndexerState.ABORTING);
|
||||
return prevState == IndexerState.STOPPED || prevState == IndexerState.STARTED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Triggers a background job that builds the rollup index asynchronously iff there is no other job that runs
|
||||
* and the indexer is started ({@link IndexerState#STARTED}.
|
||||
*
|
||||
* @param now The current time in milliseconds (used to limit the job to complete buckets)
|
||||
* @return true if a job has been triggered, false otherwise
|
||||
*/
|
||||
public synchronized boolean maybeTriggerAsyncJob(long now) {
|
||||
final IndexerState currentState = state.get();
|
||||
switch (currentState) {
|
||||
case INDEXING:
|
||||
case STOPPING:
|
||||
case ABORTING:
|
||||
logger.warn("Schedule was triggered for rollup job [" + job.getConfig().getId() + "], but prior indexer is still running.");
|
||||
return false;
|
||||
|
||||
case STOPPED:
|
||||
logger.debug("Schedule was triggered for rollup job [" + job.getConfig().getId()
|
||||
+ "] but job is stopped. Ignoring trigger.");
|
||||
return false;
|
||||
|
||||
case STARTED:
|
||||
logger.debug("Schedule was triggered for rollup job [" + job.getConfig().getId() + "], state: [" + currentState + "]");
|
||||
// Only valid time to start indexing is when we are STARTED but not currently INDEXING.
|
||||
stats.incrementNumInvocations(1);
|
||||
|
||||
// rounds the current time to its current bucket based on the date histogram interval.
|
||||
// this is needed to exclude buckets that can still receive new documents.
|
||||
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
|
||||
long rounded = dateHisto.createRounding().round(now);
|
||||
if (dateHisto.getDelay() != null) {
|
||||
// if the job has a delay we filter all documents that appear before it.
|
||||
maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis();
|
||||
} else {
|
||||
maxBoundary = rounded;
|
||||
}
|
||||
|
||||
if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
|
||||
// fire off the search. Note this is async, the method will return from here
|
||||
executor.execute(() -> doNextSearch(buildSearchRequest(),
|
||||
ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc))));
|
||||
logger.debug("Beginning to rollup [" + job.getConfig().getId() + "], state: [" + currentState + "]");
|
||||
return true;
|
||||
} else {
|
||||
logger.debug("Could not move from STARTED to INDEXING state because current state is [" + state.get() + "]");
|
||||
return false;
|
||||
}
|
||||
|
||||
default:
|
||||
logger.warn("Encountered unexpected state [" + currentState + "] while indexing");
|
||||
throw new IllegalStateException("Rollup job encountered an illegal state [" + currentState + "]");
|
||||
@Override
|
||||
protected void onStartJob(long now) {
|
||||
// this is needed to exclude buckets that can still receive new documents.
|
||||
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
|
||||
long rounded = dateHisto.createRounding().round(now);
|
||||
if (dateHisto.getDelay() != null) {
|
||||
// if the job has a delay we filter all documents that appear before it.
|
||||
maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis();
|
||||
} else {
|
||||
maxBoundary = rounded;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the {@link IndexerState} and returns false if the execution
|
||||
* should be stopped.
|
||||
*/
|
||||
private boolean checkState(IndexerState currentState) {
|
||||
switch (currentState) {
|
||||
case INDEXING:
|
||||
// normal state;
|
||||
return true;
|
||||
|
||||
case STOPPING:
|
||||
logger.info("Rollup job encountered [" + IndexerState.STOPPING + "] state, halting indexer.");
|
||||
doSaveState(finishAndSetState(), getPosition(), () -> {});
|
||||
return false;
|
||||
|
||||
case STOPPED:
|
||||
return false;
|
||||
|
||||
case ABORTING:
|
||||
logger.info("Requested shutdown of indexer for job [" + job.getConfig().getId() + "]");
|
||||
onAbort();
|
||||
return false;
|
||||
|
||||
default:
|
||||
// Anything other than indexing, aborting or stopping is unanticipated
|
||||
logger.warn("Encountered unexpected state [" + currentState + "] while indexing");
|
||||
throw new IllegalStateException("Rollup job encountered an illegal state [" + currentState + "]");
|
||||
}
|
||||
}
|
||||
|
||||
private void onBulkResponse(BulkResponse response, Map<String, Object> after) {
|
||||
// TODO we should check items in the response and move after accordingly to resume the failing buckets ?
|
||||
stats.incrementNumRollups(response.getItems().length);
|
||||
if (response.hasFailures()) {
|
||||
logger.warn("Error while attempting to bulk index rollup documents: " + response.buildFailureMessage());
|
||||
}
|
||||
try {
|
||||
if (checkState(getState()) == false) {
|
||||
return ;
|
||||
}
|
||||
position.set(after);
|
||||
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithFailure);
|
||||
// TODO probably something more intelligent than every-50 is needed
|
||||
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
|
||||
doSaveState(IndexerState.INDEXING, after, () -> doNextSearch(buildSearchRequest(), listener));
|
||||
} else {
|
||||
doNextSearch(buildSearchRequest(), listener);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
finishWithFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void onSearchResponse(SearchResponse searchResponse) {
|
||||
try {
|
||||
if (checkState(getState()) == false) {
|
||||
return ;
|
||||
}
|
||||
if (searchResponse.getShardFailures().length != 0) {
|
||||
throw new RuntimeException("Shard failures encountered while running indexer for rollup job ["
|
||||
+ job.getConfig().getId() + "]: " + Arrays.toString(searchResponse.getShardFailures()));
|
||||
}
|
||||
final CompositeAggregation response = searchResponse.getAggregations().get(AGGREGATION_NAME);
|
||||
if (response == null) {
|
||||
throw new IllegalStateException("Missing composite response for query: " + compositeBuilder.toString());
|
||||
}
|
||||
stats.incrementNumPages(1);
|
||||
if (response.getBuckets().isEmpty()) {
|
||||
// this is the end...
|
||||
logger.debug("Finished indexing for job [" + job.getConfig().getId() + "], saving state and shutting down.");
|
||||
|
||||
// Change state first, then try to persist. This prevents in-progress STOPPING/ABORTING from
|
||||
// being persisted as STARTED but then stop the job
|
||||
doSaveState(finishAndSetState(), position.get(), this::onFinish);
|
||||
return;
|
||||
}
|
||||
|
||||
final BulkRequest bulkRequest = new BulkRequest();
|
||||
@Override
|
||||
protected SearchRequest buildSearchRequest() {
|
||||
// Indexer is single-threaded, and only place that the ID scheme can get upgraded is doSaveState(), so
|
||||
// we can pass down the boolean value rather than the atomic here
|
||||
final List<IndexRequest> docs = IndexerUtils.processBuckets(response, job.getConfig().getRollupIndex(),
|
||||
stats, job.getConfig().getGroupConfig(), job.getConfig().getId(), upgradedDocumentID.get());
|
||||
docs.forEach(bulkRequest::add);
|
||||
assert bulkRequest.requests().size() > 0;
|
||||
doNextBulk(bulkRequest,
|
||||
ActionListener.wrap(
|
||||
bulkResponse -> onBulkResponse(bulkResponse, response.afterKey()),
|
||||
exc -> finishWithFailure(exc)
|
||||
)
|
||||
);
|
||||
} catch(Exception e) {
|
||||
finishWithFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void finishWithFailure(Exception exc) {
|
||||
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
|
||||
}
|
||||
|
||||
private IndexerState finishAndSetState() {
|
||||
return state.updateAndGet(
|
||||
prev -> {
|
||||
switch (prev) {
|
||||
case INDEXING:
|
||||
// ready for another job
|
||||
return IndexerState.STARTED;
|
||||
|
||||
case STOPPING:
|
||||
// must be started again
|
||||
return IndexerState.STOPPED;
|
||||
|
||||
case ABORTING:
|
||||
// abort and exit
|
||||
onAbort();
|
||||
return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first
|
||||
|
||||
case STOPPED:
|
||||
// No-op. Shouldn't really be possible to get here (should have to go through STOPPING
|
||||
// first which will be handled) but is harmless to no-op and we don't want to throw exception here
|
||||
return IndexerState.STOPPED;
|
||||
|
||||
default:
|
||||
// any other state is unanticipated at this point
|
||||
throw new IllegalStateException("Rollup job encountered an illegal state [" + prev + "]");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private SearchRequest buildSearchRequest() {
|
||||
final Map<String, Object> position = getPosition();
|
||||
SearchSourceBuilder searchSource = new SearchSourceBuilder()
|
||||
.size(0)
|
||||
@ -405,6 +119,16 @@ public abstract class RollupIndexer {
|
||||
return new SearchRequest(job.getConfig().getIndexPattern()).source(searchSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
|
||||
final CompositeAggregation response = searchResponse.getAggregations().get(AGGREGATION_NAME);
|
||||
|
||||
return new IterationResult<>(
|
||||
IndexerUtils.processBuckets(response, job.getConfig().getRollupIndex(), getStats(),
|
||||
job.getConfig().getGroupConfig(), job.getConfig().getId(), upgradedDocumentID.get()),
|
||||
response.afterKey(), response.getBuckets().isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a skeleton {@link CompositeAggregationBuilder} from the provided job config.
|
||||
* @param config The config for the job.
|
||||
@ -481,7 +205,7 @@ public abstract class RollupIndexer {
|
||||
final TermsGroupConfig terms = groupConfig.getTerms();
|
||||
builders.addAll(createValueSourceBuilders(terms));
|
||||
}
|
||||
return unmodifiableList(builders);
|
||||
return Collections.unmodifiableList(builders);
|
||||
}
|
||||
|
||||
public static List<CompositeValuesSourceBuilder<?>> createValueSourceBuilders(final DateHistogramGroupConfig dateHistogram) {
|
||||
@ -491,7 +215,7 @@ public abstract class RollupIndexer {
|
||||
dateHistogramBuilder.dateHistogramInterval(dateHistogram.getInterval());
|
||||
dateHistogramBuilder.field(dateHistogramField);
|
||||
dateHistogramBuilder.timeZone(toDateTimeZone(dateHistogram.getTimeZone()));
|
||||
return singletonList(dateHistogramBuilder);
|
||||
return Collections.singletonList(dateHistogramBuilder);
|
||||
}
|
||||
|
||||
public static List<CompositeValuesSourceBuilder<?>> createValueSourceBuilders(final HistogramGroupConfig histogram) {
|
||||
@ -506,7 +230,7 @@ public abstract class RollupIndexer {
|
||||
builders.add(histogramBuilder);
|
||||
}
|
||||
}
|
||||
return unmodifiableList(builders);
|
||||
return Collections.unmodifiableList(builders);
|
||||
}
|
||||
|
||||
public static List<CompositeValuesSourceBuilder<?>> createValueSourceBuilders(final TermsGroupConfig terms) {
|
||||
@ -520,7 +244,7 @@ public abstract class RollupIndexer {
|
||||
builders.add(termsBuilder);
|
||||
}
|
||||
}
|
||||
return unmodifiableList(builders);
|
||||
return Collections.unmodifiableList(builders);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -564,7 +288,7 @@ public abstract class RollupIndexer {
|
||||
}
|
||||
}
|
||||
}
|
||||
return unmodifiableList(builders);
|
||||
return Collections.unmodifiableList(builders);
|
||||
}
|
||||
|
||||
private static DateTimeZone toDateTimeZone(final String timezone) {
|
||||
|
@ -25,13 +25,13 @@ import org.elasticsearch.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||
import org.elasticsearch.xpack.core.rollup.RollupField;
|
||||
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
|
||||
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
|
||||
import org.elasticsearch.xpack.core.rollup.job.IndexerState;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupIndexerJobStats;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobStats;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
|
||||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
||||
import org.elasticsearch.xpack.rollup.Rollup;
|
||||
@ -218,7 +218,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
||||
* Gets the stats for this task.
|
||||
* @return The stats of this task
|
||||
*/
|
||||
public RollupJobStats getStats() {
|
||||
public RollupIndexerJobStats getStats() {
|
||||
return indexer.getStats();
|
||||
}
|
||||
|
||||
|
@ -12,21 +12,19 @@ import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.rollup.Rollup;
|
||||
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.elasticsearch.xpack.rollup.Rollup;
|
||||
|
||||
public class RestGetRollupJobsAction extends BaseRestHandler {
|
||||
public static final ParseField ID = new ParseField("id");
|
||||
|
||||
public RestGetRollupJobsAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(RestRequest.Method.GET, Rollup.BASE_PATH + "job/{id}/", this);
|
||||
controller.registerHandler(RestRequest.Method.GET, Rollup.BASE_PATH + "job/{id}/", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
|
||||
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
|
||||
String id = restRequest.param(ID.getPreferredName());
|
||||
GetRollupJobsAction.Request request = new GetRollupJobsAction.Request(id);
|
||||
|
||||
|
@ -41,7 +41,7 @@ import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.HistogramGroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.MetricConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobStats;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupIndexerJobStats;
|
||||
import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig;
|
||||
import org.joda.time.DateTime;
|
||||
import org.mockito.stubbing.Answer;
|
||||
@ -66,7 +66,7 @@ import static org.mockito.Mockito.when;
|
||||
public class IndexerUtilsTests extends AggregatorTestCase {
|
||||
public void testMissingFields() throws IOException {
|
||||
String indexName = randomAlphaOfLengthBetween(1, 10);
|
||||
RollupJobStats stats = new RollupJobStats(0, 0, 0, 0);
|
||||
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0);
|
||||
|
||||
String timestampField = "the_histo";
|
||||
String valueField = "the_avg";
|
||||
@ -130,7 +130,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||
|
||||
public void testCorrectFields() throws IOException {
|
||||
String indexName = randomAlphaOfLengthBetween(1, 10);
|
||||
RollupJobStats stats= new RollupJobStats(0, 0, 0, 0);
|
||||
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0);
|
||||
|
||||
String timestampField = "the_histo";
|
||||
String valueField = "the_avg";
|
||||
@ -198,7 +198,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||
|
||||
public void testNumericTerms() throws IOException {
|
||||
String indexName = randomAlphaOfLengthBetween(1, 10);
|
||||
RollupJobStats stats= new RollupJobStats(0, 0, 0, 0);
|
||||
RollupIndexerJobStats stats= new RollupIndexerJobStats(0, 0, 0, 0);
|
||||
|
||||
String timestampField = "the_histo";
|
||||
String valueField = "the_avg";
|
||||
@ -255,7 +255,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||
|
||||
public void testEmptyCounts() throws IOException {
|
||||
String indexName = randomAlphaOfLengthBetween(1, 10);
|
||||
RollupJobStats stats= new RollupJobStats(0, 0, 0, 0);
|
||||
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0);
|
||||
|
||||
String timestampField = "ts";
|
||||
String valueField = "the_avg";
|
||||
@ -362,7 +362,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||
// The content of the config don't actually matter for this test
|
||||
// because the test is just looking at agg keys
|
||||
GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(123L, "abc"), null);
|
||||
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig, "foo", false);
|
||||
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", false);
|
||||
assertThat(docs.size(), equalTo(1));
|
||||
assertThat(docs.get(0).id(), equalTo("1237859798"));
|
||||
}
|
||||
@ -406,7 +406,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||
});
|
||||
|
||||
GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1L, "abc"), null);
|
||||
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig, "foo", true);
|
||||
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", true);
|
||||
assertThat(docs.size(), equalTo(1));
|
||||
assertThat(docs.get(0).id(), equalTo("foo$c9LcrFqeFW92uN_Z7sv1hA"));
|
||||
}
|
||||
@ -456,7 +456,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||
});
|
||||
|
||||
GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1, "abc"), null);
|
||||
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig, "foo", true);
|
||||
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", true);
|
||||
assertThat(docs.size(), equalTo(1));
|
||||
assertThat(docs.get(0).id(), equalTo("foo$VAFKZpyaEqYRPLyic57_qw"));
|
||||
}
|
||||
@ -483,14 +483,15 @@ public class IndexerUtilsTests extends AggregatorTestCase {
|
||||
});
|
||||
|
||||
GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), randomHistogramGroupConfig(random()), null);
|
||||
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig, "foo", randomBoolean());
|
||||
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(),
|
||||
groupConfig, "foo", randomBoolean());
|
||||
assertThat(docs.size(), equalTo(1));
|
||||
assertFalse(Strings.isNullOrEmpty(docs.get(0).id()));
|
||||
}
|
||||
|
||||
public void testMissingBuckets() throws IOException {
|
||||
String indexName = randomAlphaOfLengthBetween(1, 10);
|
||||
RollupJobStats stats= new RollupJobStats(0, 0, 0, 0);
|
||||
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0);
|
||||
|
||||
String metricField = "metric_field";
|
||||
String valueField = "value_field";
|
||||
|
@ -50,10 +50,10 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter
|
||||
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
|
||||
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.IndexerState;
|
||||
import org.elasticsearch.xpack.core.rollup.job.MetricConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
|
||||
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.junit.Before;
|
||||
|
@ -23,8 +23,8 @@ import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
|
||||
import org.elasticsearch.xpack.core.rollup.RollupField;
|
||||
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
|
||||
import org.elasticsearch.xpack.core.rollup.job.IndexerState;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
||||
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
@ -639,7 +639,7 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
||||
|
||||
// Note: no docs were indexed
|
||||
assertThat(indexer.getStats().getNumRollups(), equalTo(0L));
|
||||
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
||||
assertTrue(indexer.abort());
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
@ -743,7 +743,7 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
||||
|
||||
// Note: no docs were indexed
|
||||
assertThat(indexer.getStats().getNumRollups(), equalTo(0L));
|
||||
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
||||
assertTrue(indexer.abort());
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
@ -763,7 +763,7 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
|
||||
|
||||
Consumer<Exception> failureConsumer = e -> {
|
||||
assertThat(e.getMessage(), startsWith("Shard failures encountered while running indexer for rollup job"));
|
||||
assertThat(e.getMessage(), startsWith("Shard failures encountered while running indexer for job"));
|
||||
isFinished.set(true);
|
||||
};
|
||||
|
||||
@ -786,7 +786,7 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||
|
||||
// Note: no pages processed, no docs were indexed
|
||||
assertThat(indexer.getStats().getNumPages(), equalTo(0L));
|
||||
assertThat(indexer.getStats().getNumRollups(), equalTo(0L));
|
||||
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
||||
assertTrue(indexer.abort());
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
@ -896,7 +896,7 @@ public class RollupIndexerStateTests extends ESTestCase {
|
||||
assertThat(indexer.getStats().getNumPages(), equalTo(1L));
|
||||
|
||||
// Note: no docs were indexed
|
||||
assertThat(indexer.getStats().getNumRollups(), equalTo(0L));
|
||||
assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
|
||||
assertTrue(indexer.abort());
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
|
@ -20,11 +20,11 @@ import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
|
||||
import org.elasticsearch.xpack.core.rollup.RollupField;
|
||||
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
|
||||
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
|
||||
import org.elasticsearch.xpack.core.rollup.job.IndexerState;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
||||
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
|
||||
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
||||
|
@ -210,3 +210,4 @@ setup:
|
||||
job_state: "stopped"
|
||||
upgraded_doc_id: true
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user