Speed up integration tests in two ways. (#10648)

1) Accelerate coordinator runs to speed up segment load after publishing.

2) For streaming ingestion tests, Instead of waiting 3 minutes for data to
   load, wait until the expected number of rows is loaded.

Also updates segment-count check in ITCompactionTaskTest to eliminate a
race condition (it was looking for 6 segments, which only exist together
briefly, until the older 4 are marked unused).
This commit is contained in:
Gian Merlino 2020-12-07 10:59:29 -08:00 committed by GitHub
parent f46cc4faaf
commit b681861f05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 87 additions and 38 deletions

View File

@ -32,7 +32,11 @@ druid_coordinator_startDelay=PT5S
druid_manager_lookups_hostUpdateTimeout=PT30S
druid_manager_lookups_period=10000
druid_manager_lookups_threadPoolSize=2
druid_manager_config_pollDuration=PT10S
druid_manager_rules_pollDuration=PT10S
druid_manager_segments_pollDuration=PT2S
druid_auth_basic_common_cacheDirectory=/tmp/authCache/coordinator
druid_auth_unsecuredPaths=["/druid/coordinator/v1/loadqueue"]
druid_server_https_crlPath=/tls/revocations.crl
druid_coordinator_period_indexingPeriod=PT180000S
druid_coordinator_period_indexingPeriod=PT180000S
druid_coordinator_period=PT1S

View File

@ -27,13 +27,16 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.AbstractQueryResourceTestClient;
import org.joda.time.Interval;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQueryWithResults>
{
@ -135,17 +138,13 @@ public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQu
}
@SuppressWarnings("unchecked")
public int countRows(String dataSource, String interval)
public int countRows(String dataSource, Interval interval, Function<String, AggregatorFactory> countAggregator)
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(dataSource)
.aggregators(
ImmutableList.of(
new LongSumAggregatorFactory("rows", "count")
)
)
.aggregators(ImmutableList.of(countAggregator.apply("rows")))
.granularity(Granularities.ALL)
.intervals(interval)
.intervals(Collections.singletonList(interval))
.build();
List<Map<String, Object>> results = queryClient.query(getQueryURL(broker), query);

View File

@ -23,7 +23,13 @@ import org.joda.time.DateTime;
public interface StreamGenerator
{
void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds);
/**
* Runs and returns the number of messages written.
*/
long run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds);
void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime);
/**
* Runs and returns the number of messages written.
*/
long run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime);
}

View File

@ -48,13 +48,18 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
abstract List<Pair<String, Object>> newEvent(int row, DateTime timestamp);
@Override
public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds)
public long run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds)
{
run(streamTopic, streamEventWriter, totalNumberOfSeconds, null);
return run(streamTopic, streamEventWriter, totalNumberOfSeconds, null);
}
@Override
public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime)
public long run(
String streamTopic,
StreamEventWriter streamEventWriter,
int totalNumberOfSeconds,
DateTime overrrideFirstEventTime
)
{
// The idea here is that we will send [eventsPerSecond] events that will either use [nowFlooredToSecond]
// or the [overrrideFirstEventTime] as the primary timestamp.
@ -65,6 +70,7 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
DateTime nowCeilingToSecond = DateTimes.nowUtc().secondOfDay().roundCeilingCopy();
DateTime eventTimestamp = overrrideFirstEventTime == null ? nowCeilingToSecond : overrrideFirstEventTime;
int seconds = 0;
long numWritten = 0;
while (true) {
try {
@ -87,6 +93,7 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
for (int i = 1; i <= eventsPerSecond; i++) {
streamEventWriter.write(streamTopic, serializer.serialize(newEvent(i, eventTimestamp)));
numWritten++;
long sleepTime = calculateSleepTimeMs(eventsPerSecond - i, nowCeilingToSecond);
if ((i <= 100 && i % 10 == 0) || i % 100 == 0) {
@ -126,6 +133,8 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
throw new RuntimeException("Exception in event generation loop", e);
}
}
return numWritten;
}
/**

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
@ -111,7 +112,11 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
// wait for a while to let the events be ingested
ITRetryUtil.retryUntil(
() -> {
final int countRows = queryHelper.countRows(fullDatasourceName, Intervals.ETERNITY.toString());
final int countRows = queryHelper.countRows(
fullDatasourceName,
Intervals.ETERNITY,
name -> new LongSumAggregatorFactory(name, "count")
);
return countRows == getNumExpectedRowsIngested();
},
true,

View File

@ -25,8 +25,11 @@ import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.DruidClusterAdminClient;
import org.apache.druid.testing.utils.EventSerializer;
@ -66,7 +69,6 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// The value to this tag is a timestamp that can be used by a lambda function to remove unused stream.
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
private static final int STREAM_SHARD_COUNT = 2;
private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
private static final long CYCLE_PADDING_MS = 100;
private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
@ -191,13 +193,13 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
LOG.info("Submitted supervisor");
// Start data generator
streamGenerator.run(
final long numWritten = streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
TOTAL_NUMBER_OF_SECOND,
FIRST_EVENT_TIME
);
verifyIngestedData(generatedTestConfig);
verifyIngestedData(generatedTestConfig, numWritten);
}
}
@ -253,7 +255,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
streamGenerator.run(
long numWritten = streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateFirstRound,
@ -270,7 +272,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Suspend the supervisor
indexer.suspendSupervisor(generatedTestConfig.getSupervisorId());
// Start generating remainning half of the data
streamGenerator.run(
numWritten += streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateRemaining,
@ -287,7 +289,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
"Waiting for supervisor to be healthy"
);
// Verify that supervisor can catch up with the stream
verifyIngestedData(generatedTestConfig);
verifyIngestedData(generatedTestConfig, numWritten);
}
}
@ -332,7 +334,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
streamGenerator.run(
long numWritten = streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateFirstRound,
@ -353,7 +355,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Start generating one third of the data (while restarting)
int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
streamGenerator.run(
numWritten += streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateSecondRound,
@ -364,7 +366,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
waitForReadyRunnable.run();
LOG.info("Druid process is now available");
// Start generating remaining data (after restarting)
streamGenerator.run(
numWritten += streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateRemaining,
@ -379,7 +381,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
"Waiting for supervisor to be healthy"
);
// Verify that supervisor ingested all data
verifyIngestedData(generatedTestConfig);
verifyIngestedData(generatedTestConfig, numWritten);
}
}
@ -409,7 +411,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
streamGenerator.run(
long numWritten = streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateFirstRound,
@ -428,7 +430,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
// Start generating one third of the data (while resharding)
int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
streamGenerator.run(
numWritten += streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateSecondRound,
@ -454,7 +456,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
"Waiting for stream to finish resharding"
);
// Start generating remaining data (after resharding)
streamGenerator.run(
numWritten += streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateRemaining,
@ -469,15 +471,29 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
"Waiting for supervisor to be healthy"
);
// Verify that supervisor can catch up with the stream
verifyIngestedData(generatedTestConfig);
verifyIngestedData(generatedTestConfig, numWritten);
}
}
private void verifyIngestedData(GeneratedTestConfig generatedTestConfig) throws Exception
private void verifyIngestedData(GeneratedTestConfig generatedTestConfig, long numWritten) throws Exception
{
// Wait for supervisor to consume events
LOG.info("Waiting for [%s] millis for stream indexing tasks to consume events", WAIT_TIME_MILLIS);
Thread.sleep(WAIT_TIME_MILLIS);
LOG.info("Waiting for stream indexing tasks to consume events");
ITRetryUtil.retryUntilTrue(
() ->
numWritten == this.queryHelper.countRows(
generatedTestConfig.getFullDatasourceName(),
Intervals.ETERNITY,
name -> new LongSumAggregatorFactory(name, "count")
),
StringUtils.format(
"dataSource[%s] consumed [%,d] events",
generatedTestConfig.getFullDatasourceName(),
numWritten
)
);
// Query data
final String querySpec = generatedTestConfig.getStreamQueryPropsTransform()
.apply(getResourceAsString(QUERIES_FILE));

View File

@ -78,6 +78,10 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private void loadDataAndCompact(String indexTask, String queriesResource) throws Exception
{
loadData(indexTask);
// 4 segments across 2 days
checkNumberOfSegments(4);
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
try (final Closeable ignored = unloader(fullDatasourceName)) {
@ -100,10 +104,9 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
queryHelper.testQueriesFromString(queryResponseTemplate);
compactData();
// 4 segments across 2 days, compacted into 2 new segments (6 total)
checkCompactionFinished(6);
// The original 4 segments should be compacted into 2 new segments
checkNumberOfSegments(2);
queryHelper.testQueriesFromString(queryResponseTemplate);
checkCompactionIntervals(intervalsBeforeCompaction);
}
}
@ -136,7 +139,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
);
}
private void checkCompactionFinished(int numExpectedSegments)
private void checkNumberOfSegments(int numExpectedSegments)
{
ITRetryUtil.retryUntilTrue(
() -> {
@ -144,7 +147,7 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments);
return metadataSegmentCount == numExpectedSegments;
},
"Compaction segment count check"
"Segment count check"
);
}

View File

@ -25,6 +25,7 @@ import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
@ -32,6 +33,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.EventReceiverFirehoseTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
@ -118,7 +120,12 @@ public class ITUnionQueryTest extends AbstractIndexerTest
ITRetryUtil.retryUntil(
() -> {
for (int i = 0; i < numTasks; i++) {
final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01");
final int countRows = queryHelper.countRows(
fullDatasourceName + i,
Intervals.of("2013-08-31/2013-09-01"),
name -> new LongSumAggregatorFactory(name, "count")
);
// there are 10 rows, but query only covers the first 5
if (countRows < 5) {
LOG.warn("%d events have been ingested to %s so far", countRows, fullDatasourceName + i);