[Transform] fix NPE in derive stats if shouldStopAtNextCheckpo… (#52940)
fixes a NPE in _stats in case shouldStopAtNextCheckpoint is set.
This commit is contained in:
parent
d102158e6f
commit
49f41d127b
|
@ -18,6 +18,7 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
@ -189,7 +190,7 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
|
|||
&& derivedState.equals(TransformStats.State.STOPPED) == false
|
||||
&& derivedState.equals(TransformStats.State.FAILED) == false) {
|
||||
derivedState = TransformStats.State.STOPPING;
|
||||
reason = reason.isEmpty() ? "transform is set to stop at the next checkpoint" : reason;
|
||||
reason = Strings.isNullOrEmpty(reason) ? "transform is set to stop at the next checkpoint" : reason;
|
||||
}
|
||||
return new TransformStats(
|
||||
task.getTransformId(),
|
||||
|
|
|
@ -28,101 +28,159 @@ public class TransportGetTransformStatsActionTests extends ESTestCase {
|
|||
|
||||
public void testDeriveStatsStopped() {
|
||||
String transformId = "transform-with-stats";
|
||||
String reason = "";
|
||||
String reason = null;
|
||||
TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
|
||||
TransformState stoppedState =
|
||||
new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true);
|
||||
TransformState stoppedState = new TransformState(
|
||||
TransformTaskState.STOPPED,
|
||||
IndexerState.STOPPED,
|
||||
null,
|
||||
0,
|
||||
reason,
|
||||
null,
|
||||
null,
|
||||
true
|
||||
);
|
||||
withIdStateAndStats(transformId, stoppedState, stats);
|
||||
TransformCheckpointingInfo info = new TransformCheckpointingInfo(
|
||||
new TransformCheckpointStats(1, null, null, 1, 1),
|
||||
new TransformCheckpointStats(2, null, null, 2, 5),
|
||||
2,
|
||||
Instant.now());
|
||||
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, "", null, stats, TransformCheckpointingInfo.EMPTY)));
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, "", null, stats, info)));
|
||||
Instant.now()
|
||||
);
|
||||
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, TransformCheckpointingInfo.EMPTY))
|
||||
);
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info))
|
||||
);
|
||||
|
||||
reason = "foo";
|
||||
stoppedState = new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true);
|
||||
withIdStateAndStats(transformId, stoppedState, stats);
|
||||
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info)));
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, TransformCheckpointingInfo.EMPTY))
|
||||
);
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info))
|
||||
);
|
||||
}
|
||||
|
||||
public void testDeriveStatsFailed() {
|
||||
String transformId = "transform-with-stats";
|
||||
String reason = "";
|
||||
String reason = null;
|
||||
TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
|
||||
TransformState failedState =
|
||||
new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true);
|
||||
TransformState failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true);
|
||||
withIdStateAndStats(transformId, failedState, stats);
|
||||
TransformCheckpointingInfo info = new TransformCheckpointingInfo(
|
||||
new TransformCheckpointStats(1, null, null, 1, 1),
|
||||
new TransformCheckpointStats(2, null, null, 2, 5),
|
||||
2,
|
||||
Instant.now());
|
||||
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, "", null, stats, TransformCheckpointingInfo.EMPTY)));
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, "", null, stats, info)));
|
||||
Instant.now()
|
||||
);
|
||||
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, TransformCheckpointingInfo.EMPTY))
|
||||
);
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info))
|
||||
);
|
||||
|
||||
reason = "the task is failed";
|
||||
failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true);
|
||||
withIdStateAndStats(transformId, failedState, stats);
|
||||
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info)));
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, TransformCheckpointingInfo.EMPTY))
|
||||
);
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info))
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
public void testDeriveStats() {
|
||||
String transformId = "transform-with-stats";
|
||||
String reason = "";
|
||||
String reason = null;
|
||||
TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
|
||||
TransformState runningState =
|
||||
new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true);
|
||||
TransformState runningState = new TransformState(
|
||||
TransformTaskState.STARTED,
|
||||
IndexerState.INDEXING,
|
||||
null,
|
||||
0,
|
||||
reason,
|
||||
null,
|
||||
null,
|
||||
true
|
||||
);
|
||||
withIdStateAndStats(transformId, runningState, stats);
|
||||
TransformCheckpointingInfo info = new TransformCheckpointingInfo(
|
||||
new TransformCheckpointStats(1, null, null, 1, 1),
|
||||
new TransformCheckpointStats(2, null, null, 2, 5),
|
||||
2,
|
||||
Instant.now());
|
||||
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING,
|
||||
"transform is set to stop at the next checkpoint", null, stats, TransformCheckpointingInfo.EMPTY)));
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING,
|
||||
"transform is set to stop at the next checkpoint", null, stats, info)));
|
||||
Instant.now()
|
||||
);
|
||||
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(
|
||||
new TransformStats(
|
||||
transformId,
|
||||
TransformStats.State.STOPPING,
|
||||
"transform is set to stop at the next checkpoint",
|
||||
null,
|
||||
stats,
|
||||
TransformCheckpointingInfo.EMPTY
|
||||
)
|
||||
)
|
||||
);
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(
|
||||
new TransformStats(
|
||||
transformId,
|
||||
TransformStats.State.STOPPING,
|
||||
"transform is set to stop at the next checkpoint",
|
||||
null,
|
||||
stats,
|
||||
info
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
reason = "foo";
|
||||
runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true);
|
||||
withIdStateAndStats(transformId, runningState, stats);
|
||||
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, info)));
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, TransformCheckpointingInfo.EMPTY))
|
||||
);
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, info))
|
||||
);
|
||||
|
||||
// Stop at next checkpoint is false.
|
||||
runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, false);
|
||||
withIdStateAndStats(transformId, runningState, stats);
|
||||
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, TransformCheckpointingInfo.EMPTY)));
|
||||
assertThat(TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, info)));
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, null),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, TransformCheckpointingInfo.EMPTY))
|
||||
);
|
||||
assertThat(
|
||||
TransportGetTransformStatsAction.deriveStats(task, info),
|
||||
equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, info))
|
||||
);
|
||||
}
|
||||
|
||||
private void withIdStateAndStats(String transformId, TransformState state, TransformIndexerStats stats) {
|
||||
|
|
Loading…
Reference in New Issue