mirror of https://github.com/apache/druid.git
Add new test for handoff API (#16492)
* Add new test for handoff API * Add new method * fix test * Update test
This commit is contained in:
parent
21f725f33e
commit
f7013e012c
|
@ -160,6 +160,36 @@ public class OverlordResourceTestClient
|
|||
}
|
||||
}
|
||||
|
||||
public StatusResponseHolder handoffTaskGroupEarly(
|
||||
String dataSource,
|
||||
String taskGroups
|
||||
)
|
||||
{
|
||||
try {
|
||||
LOG.info("handing off %s %s", dataSource, taskGroups);
|
||||
StatusResponseHolder response = httpClient.go(
|
||||
new Request(HttpMethod.POST, new URL(StringUtils.format(
|
||||
"%ssupervisor/%s/taskGroups/handoff",
|
||||
getIndexerURL(),
|
||||
StringUtils.urlEncode(dataSource)
|
||||
))).setContent(
|
||||
"application/json",
|
||||
StringUtils.toUtf8(taskGroups)
|
||||
),
|
||||
StatusResponseHandler.getInstance()
|
||||
).get();
|
||||
LOG.info("Handoff early response code " + response.getStatus().getCode());
|
||||
LOG.info("Handoff early response " + response.getContent());
|
||||
return response;
|
||||
}
|
||||
catch (ISE e) {
|
||||
throw e;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<TaskResponseObject> getAllTasks()
|
||||
{
|
||||
return getTasks("tasks");
|
||||
|
|
|
@ -32,6 +32,7 @@ public class TaskResponseObject
|
|||
private final DateTime createdTime;
|
||||
private final DateTime queueInsertionTime;
|
||||
private final TaskState status;
|
||||
private final Long duration;
|
||||
|
||||
@JsonCreator
|
||||
private TaskResponseObject(
|
||||
|
@ -39,7 +40,8 @@ public class TaskResponseObject
|
|||
@JsonProperty("type") String type,
|
||||
@JsonProperty("createdTime") DateTime createdTime,
|
||||
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
|
||||
@JsonProperty("status") TaskState status
|
||||
@JsonProperty("status") TaskState status,
|
||||
@JsonProperty("duration") Long duration
|
||||
)
|
||||
{
|
||||
this.id = id;
|
||||
|
@ -47,6 +49,7 @@ public class TaskResponseObject
|
|||
this.createdTime = createdTime;
|
||||
this.queueInsertionTime = queueInsertionTime;
|
||||
this.status = status;
|
||||
this.duration = duration;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -78,4 +81,10 @@ public class TaskResponseObject
|
|||
{
|
||||
return status;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Long getDuration()
|
||||
{
|
||||
return duration;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.Intervals;
|
|||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.io.Closer;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
|
||||
import org.apache.druid.testing.IntegrationTestingConfig;
|
||||
|
@ -75,6 +76,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
|
|||
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
|
||||
private static final int STREAM_SHARD_COUNT = 2;
|
||||
protected static final long CYCLE_PADDING_MS = 100;
|
||||
private static final int LONG_DURATION_SUPERVISOR_MILLIS = 600 * 1000;
|
||||
|
||||
private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
|
||||
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
|
||||
|
@ -82,6 +84,9 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
|
|||
private static final String SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE =
|
||||
"supervisor_with_idle_behaviour_enabled_spec_template.json";
|
||||
|
||||
private static final String SUPERVISOR_LONG_DURATION_TEMPLATE_FILE =
|
||||
"supervisor_with_long_duration.json";
|
||||
|
||||
protected static final String DATA_RESOURCE_ROOT = "/stream/data";
|
||||
protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
|
||||
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
|
||||
|
@ -90,6 +95,9 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
|
|||
protected static final String SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH =
|
||||
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE);
|
||||
|
||||
protected static final String SUPERVISOR_WITH_LONG_DURATION_TEMPLATE_PATH =
|
||||
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_LONG_DURATION_TEMPLATE_FILE);
|
||||
|
||||
protected static final String SERIALIZER_SPEC_DIR = "serializer";
|
||||
protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
|
||||
protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser";
|
||||
|
@ -230,6 +238,113 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
|
|||
}
|
||||
}
|
||||
|
||||
protected void doTestIndexDataHandoffEarly(
|
||||
@Nullable Boolean transactionEnabled
|
||||
) throws Exception
|
||||
{
|
||||
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
|
||||
INPUT_FORMAT,
|
||||
getResourceAsString(JSON_INPUT_FORMAT_PATH)
|
||||
);
|
||||
try (
|
||||
final Closeable closer = createResourceCloser(generatedTestConfig);
|
||||
final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
|
||||
) {
|
||||
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
|
||||
.apply(getResourceAsString(SUPERVISOR_WITH_LONG_DURATION_TEMPLATE_PATH));
|
||||
LOG.info("supervisorSpec: [%s]\n", taskSpec);
|
||||
// Start supervisor
|
||||
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
|
||||
LOG.info("Submitted supervisor");
|
||||
|
||||
// Start generating half of the data
|
||||
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
|
||||
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
|
||||
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
|
||||
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
|
||||
new JsonEventSerializer(jsonMapper),
|
||||
EVENTS_PER_SECOND,
|
||||
CYCLE_PADDING_MS
|
||||
);
|
||||
long numWritten = streamGenerator.run(
|
||||
generatedTestConfig.getStreamName(),
|
||||
streamEventWriter,
|
||||
secondsToGenerateFirstRound,
|
||||
FIRST_EVENT_TIME
|
||||
);
|
||||
|
||||
// Make sure we consume the data written
|
||||
long numWrittenHalf = numWritten;
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() ->
|
||||
numWrittenHalf == this.queryHelper.countRows(
|
||||
generatedTestConfig.getFullDatasourceName(),
|
||||
Intervals.ETERNITY,
|
||||
name -> new LongSumAggregatorFactory(name, "count")
|
||||
),
|
||||
StringUtils.format(
|
||||
"dataSource[%s] consumed [%,d] events, expected [%,d]",
|
||||
generatedTestConfig.getFullDatasourceName(),
|
||||
this.queryHelper.countRows(
|
||||
generatedTestConfig.getFullDatasourceName(),
|
||||
Intervals.ETERNITY,
|
||||
name -> new LongSumAggregatorFactory(name, "count")
|
||||
),
|
||||
numWritten
|
||||
)
|
||||
);
|
||||
|
||||
// Trigger early handoff
|
||||
StatusResponseHolder response = indexer.handoffTaskGroupEarly(
|
||||
generatedTestConfig.getFullDatasourceName(),
|
||||
jsonMapper.writeValueAsString(
|
||||
ImmutableMap.of(
|
||||
"taskGroupIds", ImmutableList.of(0)
|
||||
)
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(response.getStatus().getCode(), 200);
|
||||
|
||||
// Load the rest of the data
|
||||
numWritten += streamGenerator.run(
|
||||
generatedTestConfig.getStreamName(),
|
||||
streamEventWriter,
|
||||
secondsToGenerateRemaining,
|
||||
FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
|
||||
);
|
||||
|
||||
// Make sure we consume the rest of the data
|
||||
long numWrittenAll = numWritten;
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() ->
|
||||
numWrittenAll == this.queryHelper.countRows(
|
||||
generatedTestConfig.getFullDatasourceName(),
|
||||
Intervals.ETERNITY,
|
||||
name -> new LongSumAggregatorFactory(name, "count")
|
||||
),
|
||||
StringUtils.format(
|
||||
"dataSource[%s] consumed [%,d] events, expected [%,d]",
|
||||
generatedTestConfig.getFullDatasourceName(),
|
||||
this.queryHelper.countRows(
|
||||
generatedTestConfig.getFullDatasourceName(),
|
||||
Intervals.ETERNITY,
|
||||
name -> new LongSumAggregatorFactory(name, "count")
|
||||
),
|
||||
numWritten
|
||||
)
|
||||
);
|
||||
|
||||
// Wait for the early handoff task to complete and cheeck its duration
|
||||
ITRetryUtil.retryUntilTrue(
|
||||
() -> (!indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName()).isEmpty()),
|
||||
"Waiting for Task Completion"
|
||||
);
|
||||
|
||||
List<TaskResponseObject> completedTasks = indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName());
|
||||
Assert.assertEquals(completedTasks.stream().filter(taskResponseObject -> taskResponseObject.getDuration() < LONG_DURATION_SUPERVISOR_MILLIS).count(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
void doTestIndexDataWithLosingCoordinator(@Nullable Boolean transactionEnabled) throws Exception
|
||||
{
|
||||
testIndexWithLosingNodeHelper(
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.tests.indexer;
|
||||
|
||||
import org.apache.druid.testing.guice.DruidTestModuleFactory;
|
||||
import org.apache.druid.tests.TestNGGroup;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
import org.testng.annotations.Guice;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
@Test(groups = TestNGGroup.KAFKA_INDEX)
|
||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||
public class ITKafkaIndexingServiceStopTasksEarlyTest extends AbstractKafkaIndexingServiceTest
|
||||
{
|
||||
@Override
|
||||
public String getTestNamePrefix()
|
||||
{
|
||||
return "kafka_stop_tasks_early";
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public void beforeClass() throws Exception
|
||||
{
|
||||
doBeforeClass();
|
||||
}
|
||||
|
||||
|
||||
// This test does not actually check whether the tasks stopped early since the API does
|
||||
// not make any guarantees about handoff. Instead it makes sure the handoff API can be called
|
||||
// and that the tasks will eventually catch up with new data.
|
||||
@Test
|
||||
public void testStopTasksEarly() throws Exception
|
||||
{
|
||||
doTestIndexDataHandoffEarly(false);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
{
|
||||
"type": "%%STREAM_TYPE%%",
|
||||
"dataSchema": {
|
||||
"dataSource": "%%DATASOURCE%%",
|
||||
"parser": %%PARSER%%,
|
||||
"timestampSpec": {
|
||||
"column": "timestamp",
|
||||
"format": "auto"
|
||||
},
|
||||
"dimensionsSpec": {
|
||||
"dimensions": %%DIMENSIONS%%,
|
||||
"dimensionExclusions": [],
|
||||
"spatialDimensions": []
|
||||
},
|
||||
"metricsSpec": [
|
||||
{
|
||||
"type": "count",
|
||||
"name": "count"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "added",
|
||||
"fieldName": "added"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "deleted",
|
||||
"fieldName": "deleted"
|
||||
},
|
||||
{
|
||||
"type": "doubleSum",
|
||||
"name": "delta",
|
||||
"fieldName": "delta"
|
||||
}
|
||||
],
|
||||
"granularitySpec": {
|
||||
"type": "uniform",
|
||||
"segmentGranularity": "MINUTE",
|
||||
"queryGranularity": "NONE"
|
||||
}
|
||||
},
|
||||
"tuningConfig": {
|
||||
"type": "%%STREAM_TYPE%%",
|
||||
"intermediatePersistPeriod": "PT30S",
|
||||
"maxRowsPerSegment": 5000000,
|
||||
"maxRowsInMemory": 500000
|
||||
},
|
||||
"ioConfig": {
|
||||
"%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
|
||||
"%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
|
||||
"taskCount": 1,
|
||||
"replicas": 1,
|
||||
"taskDuration": "PT600S",
|
||||
"%%USE_EARLIEST_KEY%%": true,
|
||||
"inputFormat" : %%INPUT_FORMAT%%
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue