mirror of https://github.com/apache/druid.git
configurable shutdownTimeout for Kakfa supervisor (#3497)
* configurable shutdownTimeout * cr change
This commit is contained in:
parent
ca9114b41b
commit
9226d4af3c
|
@ -127,6 +127,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|
|||
|`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))|
|
||||
|`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.|no (default == 8)|
|
||||
|`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an indexing task.|no (default == PT10S)|
|
||||
|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)|
|
||||
|
||||
#### IndexSpec
|
||||
|
||||
|
|
|
@ -101,7 +101,6 @@ public class KafkaSupervisor implements Supervisor
|
|||
private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
|
||||
private static final Random RANDOM = new Random();
|
||||
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; // prevent us from running too often in response to events
|
||||
private static final int SHUTDOWN_TIMEOUT_MILLIS = 15000;
|
||||
private static final long NOT_SET = -1;
|
||||
|
||||
// Internal data structures
|
||||
|
@ -365,11 +364,12 @@ public class KafkaSupervisor implements Supervisor
|
|||
notices.add(new ShutdownNotice());
|
||||
}
|
||||
|
||||
long endTime = System.currentTimeMillis() + SHUTDOWN_TIMEOUT_MILLIS;
|
||||
long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis();
|
||||
long endTime = System.currentTimeMillis() + shutdownTimeoutMillis;
|
||||
while (!stopped) {
|
||||
long sleepTime = endTime - System.currentTimeMillis();
|
||||
if (sleepTime <= 0) {
|
||||
log.info("Timed out while waiting for shutdown");
|
||||
log.info("Timed out while waiting for shutdown (timeout [%,dms])", shutdownTimeoutMillis);
|
||||
stopped = true;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -73,6 +73,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
|
||||
|
|
|
@ -33,6 +33,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
|
|||
private final Integer chatThreads;
|
||||
private final Long chatRetries;
|
||||
private final Duration httpTimeout;
|
||||
private final Duration shutdownTimeout;
|
||||
|
||||
public KafkaSupervisorTuningConfig(
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
|
@ -47,7 +48,8 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
|
|||
@JsonProperty("workerThreads") Integer workerThreads,
|
||||
@JsonProperty("chatThreads") Integer chatThreads,
|
||||
@JsonProperty("chatRetries") Long chatRetries,
|
||||
@JsonProperty("httpTimeout") Period httpTimeout
|
||||
@JsonProperty("httpTimeout") Period httpTimeout,
|
||||
@JsonProperty("shutdownTimeout") Period shutdownTimeout
|
||||
)
|
||||
{
|
||||
super(
|
||||
|
@ -66,6 +68,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
|
|||
this.chatThreads = chatThreads;
|
||||
this.chatRetries = (chatRetries != null ? chatRetries : 8);
|
||||
this.httpTimeout = defaultDuration(httpTimeout, "PT10S");
|
||||
this.shutdownTimeout = defaultDuration(shutdownTimeout, "PT80S");
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -92,6 +95,12 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
|
|||
return httpTimeout;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Duration getShutdownTimeout()
|
||||
{
|
||||
return shutdownTimeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
@ -109,6 +118,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
|
|||
", chatThreads=" + chatThreads +
|
||||
", chatRetries=" + chatRetries +
|
||||
", httpTimeout=" + httpTimeout +
|
||||
", shutdownTimeout=" + shutdownTimeout +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
|
|
@ -112,6 +112,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
private static final int TEST_CHAT_THREADS = 3;
|
||||
private static final long TEST_CHAT_RETRIES = 9L;
|
||||
private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
|
||||
private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
|
||||
|
||||
private int numThreads;
|
||||
private TestingCluster zkServer;
|
||||
|
@ -177,7 +178,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
numThreads,
|
||||
TEST_CHAT_THREADS,
|
||||
TEST_CHAT_RETRIES,
|
||||
TEST_HTTP_TIMEOUT
|
||||
TEST_HTTP_TIMEOUT,
|
||||
TEST_SHUTDOWN_TIMEOUT
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,7 @@ public class KafkaSupervisorTuningConfigTest
|
|||
Assert.assertNull(config.getChatThreads());
|
||||
Assert.assertEquals(8L, (long) config.getChatRetries());
|
||||
Assert.assertEquals(Duration.standardSeconds(10), config.getHttpTimeout());
|
||||
Assert.assertEquals(Duration.standardSeconds(80), config.getShutdownTimeout());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -88,7 +89,8 @@ public class KafkaSupervisorTuningConfigTest
|
|||
+ " \"workerThreads\": 12,\n"
|
||||
+ " \"chatThreads\": 13,\n"
|
||||
+ " \"chatRetries\": 14,\n"
|
||||
+ " \"httpTimeout\": \"PT15S\"\n"
|
||||
+ " \"httpTimeout\": \"PT15S\",\n"
|
||||
+ " \"shutdownTimeout\": \"PT95S\"\n"
|
||||
+ "}";
|
||||
|
||||
KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue(
|
||||
|
@ -113,5 +115,6 @@ public class KafkaSupervisorTuningConfigTest
|
|||
Assert.assertEquals(13, (int) config.getChatThreads());
|
||||
Assert.assertEquals(14L, (long) config.getChatRetries());
|
||||
Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout());
|
||||
Assert.assertEquals(Duration.standardSeconds(95), config.getShutdownTimeout());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue