mirror of https://github.com/apache/druid.git
Minor log cleanup in K8sDruidNodeDiscoveryProvider (#16701)
This commit is contained in:
parent
bf2be938a9
commit
7c6f2b1e20
|
@ -45,7 +45,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.function.BooleanSupplier;
|
import java.util.function.BooleanSupplier;
|
||||||
|
|
||||||
@ManageLifecycle
|
@ManageLifecycle
|
||||||
|
@ -116,7 +115,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
|
||||||
return nodeTypeWatchers.computeIfAbsent(
|
return nodeTypeWatchers.computeIfAbsent(
|
||||||
nodeType,
|
nodeType,
|
||||||
nType -> {
|
nType -> {
|
||||||
LOGGER.info("Creating NodeRoleWatcher for nodeRole [%s].", nType);
|
LOGGER.info("Creating NodeRoleWatcher for role[%s].", nType);
|
||||||
NodeRoleWatcher nodeRoleWatcher = new NodeRoleWatcher(
|
NodeRoleWatcher nodeRoleWatcher = new NodeRoleWatcher(
|
||||||
listenerExecutor,
|
listenerExecutor,
|
||||||
nType,
|
nType,
|
||||||
|
@ -128,7 +127,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
|
||||||
if (startAfterCreation) {
|
if (startAfterCreation) {
|
||||||
nodeRoleWatcher.start();
|
nodeRoleWatcher.start();
|
||||||
}
|
}
|
||||||
LOGGER.info("Created NodeRoleWatcher for nodeRole [%s].", nType);
|
LOGGER.info("Created NodeRoleWatcher for role[%s].", nType);
|
||||||
return nodeRoleWatcher;
|
return nodeRoleWatcher;
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
@ -188,7 +187,6 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
|
||||||
|
|
||||||
private final LifecycleLock lifecycleLock = new LifecycleLock();
|
private final LifecycleLock lifecycleLock = new LifecycleLock();
|
||||||
|
|
||||||
private final AtomicReference<Closeable> watchRef = new AtomicReference<>();
|
|
||||||
private static final Closeable STOP_MARKER = () -> {};
|
private static final Closeable STOP_MARKER = () -> {};
|
||||||
|
|
||||||
private final NodeRole nodeRole;
|
private final NodeRole nodeRole;
|
||||||
|
@ -221,7 +219,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
|
||||||
boolean cacheInitialized = false;
|
boolean cacheInitialized = false;
|
||||||
|
|
||||||
if (!lifecycleLock.awaitStarted()) {
|
if (!lifecycleLock.awaitStarted()) {
|
||||||
LOGGER.error("Lifecycle not started, Exited Watch for NodeRole [%s].", nodeRole);
|
LOGGER.error("Lifecycle not started, Exited Watch for role[%s].", nodeRole);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,23 +234,22 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
keepWatching(
|
keepWatching(
|
||||||
podInfo.getPodNamespace(),
|
|
||||||
labelSelector,
|
labelSelector,
|
||||||
list.getResourceVersion()
|
list.getResourceVersion()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (Throwable ex) {
|
catch (Throwable ex) {
|
||||||
LOGGER.error(ex, "Expection while watching for NodeRole [%s].", nodeRole);
|
LOGGER.error(ex, "Exception while watching for role[%s].", nodeRole);
|
||||||
|
|
||||||
// Wait a little before trying again.
|
// Wait a little before trying again.
|
||||||
sleep(watcherErrorRetryWaitMS);
|
sleep(watcherErrorRetryWaitMS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOGGER.info("Exited Watch for NodeRole [%s].", nodeRole);
|
LOGGER.info("Exited Watch for role[%s].", nodeRole);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void keepWatching(String namespace, String labelSelector, String resourceVersion)
|
private void keepWatching(String labelSelector, String resourceVersion)
|
||||||
{
|
{
|
||||||
String nextResourceVersion = resourceVersion;
|
String nextResourceVersion = resourceVersion;
|
||||||
while (lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
|
while (lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
|
||||||
|
@ -285,7 +282,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
|
||||||
} else {
|
} else {
|
||||||
// Try again by starting the watch from the beginning. This can happen if the
|
// Try again by starting the watch from the beginning. This can happen if the
|
||||||
// watch goes bad.
|
// watch goes bad.
|
||||||
LOGGER.debug("Received NULL item while watching node type [%s]. Restarting watch.", this.nodeRole);
|
LOGGER.debug("Received NULL item while watching role[%s]. Restarting watch.", this.nodeRole);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -301,7 +298,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
|
||||||
sleep(watcherErrorRetryWaitMS);
|
sleep(watcherErrorRetryWaitMS);
|
||||||
}
|
}
|
||||||
catch (Throwable ex) {
|
catch (Throwable ex) {
|
||||||
LOGGER.error(ex, "Error while watching node type [%s]", this.nodeRole);
|
LOGGER.error(ex, "Error while watching role[%s]", this.nodeRole);
|
||||||
sleep(watcherErrorRetryWaitMS);
|
sleep(watcherErrorRetryWaitMS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -324,11 +321,11 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
LOGGER.info("Starting NodeRoleWatcher for [%s]...", nodeRole);
|
LOGGER.info("Starting NodeRoleWatcher for role[%s]...", nodeRole);
|
||||||
this.watchExecutor = Execs.singleThreaded(this.getClass().getName() + nodeRole.getJsonName());
|
this.watchExecutor = Execs.singleThreaded(this.getClass().getName() + nodeRole.getJsonName());
|
||||||
watchExecutor.submit(this::watch);
|
watchExecutor.submit(this::watch);
|
||||||
lifecycleLock.started();
|
lifecycleLock.started();
|
||||||
LOGGER.info("Started NodeRoleWatcher for [%s].", nodeRole);
|
LOGGER.info("Started NodeRoleWatcher for role[%s].", nodeRole);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
lifecycleLock.exitStart();
|
lifecycleLock.exitStart();
|
||||||
|
@ -343,18 +340,18 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
LOGGER.info("Stopping NodeRoleWatcher for [%s]...", nodeRole);
|
LOGGER.info("Stopping NodeRoleWatcher for role[%s]...", nodeRole);
|
||||||
// STOP_MARKER cannot throw exceptions on close(), so this is OK.
|
// STOP_MARKER cannot throw exceptions on close(), so this is OK.
|
||||||
CloseableUtils.closeAndSuppressExceptions(STOP_MARKER, e -> {});
|
CloseableUtils.closeAndSuppressExceptions(STOP_MARKER, e -> {});
|
||||||
watchExecutor.shutdownNow();
|
watchExecutor.shutdownNow();
|
||||||
|
|
||||||
if (!watchExecutor.awaitTermination(15, TimeUnit.SECONDS)) {
|
if (!watchExecutor.awaitTermination(15, TimeUnit.SECONDS)) {
|
||||||
LOGGER.warn("Failed to stop watchExecutor for NodeRoleWatcher[%s]", nodeRole);
|
LOGGER.warn("Failed to stop watchExecutor for role[%s]", nodeRole);
|
||||||
}
|
}
|
||||||
LOGGER.info("Stopped NodeRoleWatcher for [%s].", nodeRole);
|
LOGGER.info("Stopped NodeRoleWatcher for role[%s].", nodeRole);
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
LOGGER.error(ex, "Failed to stop NodeRoleWatcher for [%s].", nodeRole);
|
LOGGER.error(ex, "Failed to stop NodeRoleWatcher for role[%s].", nodeRole);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue