Supervisor metadata auto cleanup failing as missing Guice injection (#11424)

* Fix Supervisor metadata auto cleanup failing as missing Guice injection

* Fix Supervisor metadata auto cleanup failing as missing Guice injection

* fix IT

* fix IT

* Update services/src/main/java/org/apache/druid/cli/CliCoordinator.java

Co-authored-by: Clint Wylie <cjwylie@gmail.com>

* fix

* fix

* fix

* fix

* fix

* fix

* fix

Co-authored-by: Clint Wylie <cjwylie@gmail.com>
This commit is contained in:
Maytas Monsereenusorn 2021-07-13 09:47:49 +07:00 committed by GitHub
parent 2236cf2234
commit f5d53569ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 208 additions and 1 deletions

View File

@ -68,6 +68,10 @@ druid_indexer_logs_directory=/shared/tasklogs
druid_sql_enable=true
druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
druid_request_logging_type=slf4j
druid_coordinator_kill_supervisor_on=true
druid_coordinator_kill_supervisor_period=PT10S
druid_coordinator_kill_supervisor_durationToRetain=PT0M
druid_coordinator_period_metadataStoreManagementPeriod=PT10S
# Testing the legacy config from https://github.com/apache/druid/pull/10267
# Can remove this when the flag is no longer needed

View File

@ -507,6 +507,41 @@ public class OverlordResourceTestClient
}
}
public List<Object> getSupervisorHistory(String id)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.GET,
new URL(StringUtils.format(
"%ssupervisor/%s/history",
getIndexerURL(),
StringUtils.urlEncode(id)
))
),
StatusResponseHandler.getInstance()
).get();
if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
return null;
} else if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while getting supervisor status, response [%s %s]",
response.getStatus(),
response.getContent()
);
}
List<Object> responseData = jsonMapper.readValue(
response.getContent(), new TypeReference<List<Object>>()
{
}
);
return responseData;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private StatusResponseHolder makeRequest(HttpMethod method, String url)
{
try {

View File

@ -360,7 +360,100 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
}
}
protected void doTestTerminatedSupervisorAutoCleanup(@Nullable Boolean transactionEnabled) throws Exception
{
final GeneratedTestConfig generatedTestConfig1 = new GeneratedTestConfig(
INPUT_FORMAT,
getResourceAsString(JSON_INPUT_FORMAT_PATH)
);
final GeneratedTestConfig generatedTestConfig2 = new GeneratedTestConfig(
INPUT_FORMAT,
getResourceAsString(JSON_INPUT_FORMAT_PATH)
);
try (
final Closeable closer1 = createResourceCloser(generatedTestConfig1);
final Closeable closer2 = createResourceCloser(generatedTestConfig2);
final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
) {
final String taskSpec1 = generatedTestConfig1.getStreamIngestionPropsTransform()
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
LOG.info("supervisorSpec1: [%s]\n", taskSpec1);
final String taskSpec2 = generatedTestConfig2.getStreamIngestionPropsTransform()
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
LOG.info("supervisorSpec2: [%s]\n", taskSpec2);
// Start both supervisors
generatedTestConfig1.setSupervisorId(indexer.submitSupervisor(taskSpec1));
generatedTestConfig2.setSupervisorId(indexer.submitSupervisor(taskSpec2));
LOG.info("Submitted supervisors");
// Start generating the data
final StreamGenerator streamGenerator1 = new WikipediaStreamEventStreamGenerator(
new JsonEventSerializer(jsonMapper),
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
streamGenerator1.run(
generatedTestConfig1.getStreamName(),
streamEventWriter,
TOTAL_NUMBER_OF_SECOND,
FIRST_EVENT_TIME
);
final StreamGenerator streamGenerator2 = new WikipediaStreamEventStreamGenerator(
new JsonEventSerializer(jsonMapper),
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
streamGenerator2.run(
generatedTestConfig2.getStreamName(),
streamEventWriter,
TOTAL_NUMBER_OF_SECOND,
FIRST_EVENT_TIME
);
// Verify supervisors are healthy before termination
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig1.getSupervisorId())),
true,
10000,
30,
"Waiting for supervisor1 to be healthy"
);
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig2.getSupervisorId())),
true,
10000,
30,
"Waiting for supervisor2 to be healthy"
);
// Sleep for 10 secs to make sure that at least one cycle of supervisor auto cleanup duty ran
Thread.sleep(10000);
// Verify that supervisor specs exist
List<Object> specs1 = indexer.getSupervisorHistory(generatedTestConfig1.getSupervisorId());
Assert.assertNotNull(specs1);
Assert.assertFalse(specs1.isEmpty());
List<Object> specs2 = indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId());
Assert.assertNotNull(specs2);
Assert.assertFalse(specs2.isEmpty());
// Supervisor 1 should still be active while supervisor 2 is now terminated
LOG.info("Terminating supervisor 2");
indexer.terminateSupervisor(generatedTestConfig2.getSupervisorId());
// Verify that auto cleanup eventually removes supervisor spec after termination
ITRetryUtil.retryUntil(
() -> indexer.getSupervisorHistory(generatedTestConfig2.getSupervisorId()) == null,
true,
10000,
30,
"Waiting for supervisor spec 2 to be auto clean"
);
// Verify that supervisor 1 spec was not remove
specs1 = indexer.getSupervisorHistory(generatedTestConfig1.getSupervisorId());
Assert.assertNotNull(specs1);
Assert.assertFalse(specs1.isEmpty());
}
}
protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean transactionEnabled) throws Exception
{

View File

@ -71,4 +71,14 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
{
doTestIndexDataWithStreamReshardSplit(false);
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
*/
@Test
public void testKafkaTerminatedSupervisorAutoCleanup() throws Exception
{
doTestTerminatedSupervisorAutoCleanup(false);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.parallelized;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@Test(groups = TestNGGroup.KINESIS_INDEX)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisIndexingServiceTest
{
@Override
public String getTestNamePrefix()
{
return "kinesis_parallelized";
}
@BeforeClass
public void beforeClass() throws Exception
{
doBeforeClass();
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
*/
@Test
public void testKinesisTerminatedSupervisorAutoCleanup() throws Exception
{
doTestTerminatedSupervisorAutoCleanup(false);
}
}

View File

@ -79,7 +79,7 @@ public class KillSupervisors implements CoordinatorDuty
supervisorRemoved
)
);
log.info("Finished running KillSupervisors duty. Removed %,d supervisor", supervisorRemoved);
log.info("Finished running KillSupervisors duty. Removed %,d supervisor specs", supervisorRemoved);
}
catch (Exception e) {
log.error(e, "Failed to kill terminated supervisor metadata");

View File

@ -28,6 +28,7 @@ import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.name.Names;
import com.google.inject.util.Providers;
import io.airlift.airline.Command;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.audit.AuditManager;
@ -49,6 +50,8 @@ import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.http.JettyHttpClientModule;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ExecutorServices;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@ -64,6 +67,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.SegmentsMetadataManagerProvider;
import org.apache.druid.query.lookup.LookupSerdeModule;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.server.audit.AuditManagerProvider;
import org.apache.druid.server.coordinator.BalancerStrategyFactory;
import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
@ -289,6 +293,13 @@ public class CliCoordinator extends ServerRunnable
Jerseys.addResource(binder, SelfDiscoveryResource.class);
LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
if (!beOverlord) {
// These are needed to deserialize SupervisorSpec for Supervisor Auto Cleanup
binder.bind(TaskStorage.class).toProvider(Providers.of(null));
binder.bind(TaskMaster.class).toProvider(Providers.of(null));
binder.bind(RowIngestionMetersFactory.class).toProvider(Providers.of(null));
}
}
@Provides