kubernetes: restart watch on null response (#12233)

* kubernetes: restart watch on null response

Kubernetes watches allow a client to efficiently processes changes to
resources. However, they have some idiosyncrasies. In particular, they
can error out for various reasons leading to what would normally be seen
as an invalid result.

The Druid kubernetes node discovery subsystem does not handle a certain
case properly. The watch can return an item with a null object.  These
leads to a null pointer exception. When this happens, the provider needs
to restart the watch, because rerunning the watch from the same resource
version leads to the same result: yet another null pointer exception.

This commit changes the provider to handle null objects by restarting
the watch.

* review: add more coverage

This adds a bit more coverage to the K8sDruidNodeDiscoveryProvider watch
loop, and removes an unnecessay return.

* kubernetes: reduce logging verbosity

The log messages about items being NULL don't really deserve to be at a
level other than DEBUG since they are not actionable, particularly since
we automatically recover now. Move them to the DEBUG level.
This commit is contained in:
Kyle Larose 2022-03-10 15:56:40 -05:00 committed by GitHub
parent cb2b2b696d
commit db91961af7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 154 additions and 6 deletions

View File

@ -132,12 +132,22 @@ public class DefaultK8sApiClient implements K8sApiClient
while (watch.hasNext()) {
Watch.Response<V1Pod> item = watch.next();
if (item != null && item.type != null) {
obj = new Watch.Response<DiscoveryDruidNodeAndResourceVersion>(
item.type,
new DiscoveryDruidNodeAndResourceVersion(
DiscoveryDruidNodeAndResourceVersion result = null;
if (item.object != null) {
result = new DiscoveryDruidNodeAndResourceVersion(
item.object.getMetadata().getResourceVersion(),
getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
)
);
} else {
// The item's object can be null in some cases -- likely due to a blip
// in the k8s watch. Handle that by passing the null upwards. The caller
// needs to know that the object can be null.
LOGGER.debug("item of type " + item.type + " was NULL when watching nodeRole [%s]", nodeRole);
}
obj = new Watch.Response<DiscoveryDruidNodeAndResourceVersion>(
item.type,
result
);
return true;
} else {

View File

@ -267,7 +267,7 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
try {
while (iter.hasNext()) {
Watch.Response<DiscoveryDruidNodeAndResourceVersion> item = iter.next();
if (item != null && item.type != null) {
if (item != null && item.type != null && item.object != null) {
switch (item.type) {
case WatchResult.ADDED:
baseNodeRoleWatcher.childAdded(item.object.getNode());
@ -282,7 +282,10 @@ public class K8sDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider
nextResourceVersion = item.object.getResourceVersion();
} else {
LOGGER.error("WTH! item or item.type is NULL");
// Try again by starting the watch from the beginning. This can happen if the
// watch goes bad.
LOGGER.debug("Received NULL item while watching node type [%s]. Restarting watch.", this.nodeRole);
return;
}
}
}

View File

@ -34,6 +34,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -162,6 +163,140 @@ public class K8sDruidNodeDiscoveryProviderTest
discoveryProvider.stop();
}
@Test(timeout = 10_000)
public void testNodeRoleWatcherHandlesNullFromAPIByRestarting() throws Exception
{
String labelSelector = "druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn(
new DiscoveryDruidNodeList(
"v1",
ImmutableMap.of(
testNode1.getDruidNode().getHostAndPortToUse(), testNode1,
testNode2.getDruidNode().getHostAndPortToUse(), testNode2
)
)
);
EasyMock.expect(mockK8sApiClient.watchPods(
podInfo.getPodNamespace(), labelSelector, "v1", NodeRole.ROUTER)).andReturn(
new MockWatchResult(
ImmutableList.of(
new Watch.Response<>(WatchResult.ADDED, null)
),
false,
false
)
);
EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn(
new DiscoveryDruidNodeList(
"v2",
ImmutableMap.of(
testNode2.getDruidNode().getHostAndPortToUse(), testNode2,
testNode3.getDruidNode().getHostAndPortToUse(), testNode3
)
)
);
EasyMock.replay(mockK8sApiClient);
K8sDruidNodeDiscoveryProvider discoveryProvider = new K8sDruidNodeDiscoveryProvider(
podInfo,
discoveryConfig,
mockK8sApiClient,
1
);
discoveryProvider.start();
K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery = discoveryProvider.getForNodeRole(NodeRole.ROUTER, false);
MockListener testListener = new MockListener(
ImmutableList.of(
MockListener.Event.added(testNode1),
MockListener.Event.added(testNode2),
MockListener.Event.inited(),
MockListener.Event.added(testNode3),
MockListener.Event.deleted(testNode1)
)
);
nodeDiscovery.registerListener(testListener);
nodeDiscovery.start();
testListener.assertSuccess();
discoveryProvider.stop();
}
@Test(timeout = 10_000)
public void testNodeRoleWatcherLoopOnNullItems() throws Exception
{
String labelSelector = "druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true";
K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class);
EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn(
new DiscoveryDruidNodeList(
"v1",
ImmutableMap.of(
testNode1.getDruidNode().getHostAndPortToUse(), testNode1,
testNode2.getDruidNode().getHostAndPortToUse(), testNode2
)
)
);
List<Watch.Response<DiscoveryDruidNodeAndResourceVersion>> nullList = new ArrayList<Watch.Response<DiscoveryDruidNodeAndResourceVersion>>();
nullList.add(null);
EasyMock.expect(mockK8sApiClient.watchPods(
podInfo.getPodNamespace(), labelSelector, "v1", NodeRole.ROUTER)).andReturn(
new MockWatchResult(
nullList,
false,
false
)
);
EasyMock.expect(mockK8sApiClient.watchPods(
podInfo.getPodNamespace(), labelSelector, "v1", NodeRole.ROUTER)).andReturn(
new MockWatchResult(
ImmutableList.of(
new Watch.Response<>(null, new DiscoveryDruidNodeAndResourceVersion("v2", testNode4))
),
false,
false
)
);
EasyMock.expect(mockK8sApiClient.watchPods(
podInfo.getPodNamespace(), labelSelector, "v2", NodeRole.ROUTER)).andReturn(
new MockWatchResult(
ImmutableList.of(
new Watch.Response<>(WatchResult.ADDED, new DiscoveryDruidNodeAndResourceVersion("v2", testNode4))
),
false,
false
)
);
EasyMock.replay(mockK8sApiClient);
K8sDruidNodeDiscoveryProvider discoveryProvider = new K8sDruidNodeDiscoveryProvider(
podInfo,
discoveryConfig,
mockK8sApiClient,
1
);
discoveryProvider.start();
K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery = discoveryProvider.getForNodeRole(NodeRole.ROUTER, false);
MockListener testListener = new MockListener(
ImmutableList.of(
MockListener.Event.added(testNode1),
MockListener.Event.added(testNode2)
)
);
nodeDiscovery.registerListener(testListener);
nodeDiscovery.start();
testListener.assertSuccess();
discoveryProvider.stop();
}
private static class MockListener implements DruidNodeDiscovery.Listener
{
List<Event> events;