Fix NPE in TaskLockbox that prevents overlord leadership (#6512)

* fix NPE that prevents overlord from assuming leadership if extension that provides indexing task type is not loaded

* heh
This commit is contained in:
Clint Wylie 2018-10-25 13:06:11 -07:00 committed by Jihoon Son
parent b2d9b6f23d
commit e1057ad47a
2 changed files with 99 additions and 1 deletions

View File

@ -190,9 +190,11 @@ public class MetadataTaskStorage implements TaskStorage
@Override @Override
public List<Task> getActiveTasks() public List<Task> getActiveTasks()
{ {
// filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module
// and don't know what to do with the payload, so we won't be able to make use of it anyway
return handler.getActiveTaskInfo(null) return handler.getActiveTaskInfo(null)
.stream() .stream()
.filter(taskInfo -> taskInfo.getStatus().isRunnable()) .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null)
.map(TaskInfo::getTask) .map(TaskInfo::getTask)
.collect(Collectors.toList()); .collect(Collectors.toList());
} }

View File

@ -23,11 +23,16 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
@ -342,6 +347,46 @@ public class TaskLockboxTest
lockbox.syncFromStorage(); lockbox.syncFromStorage();
} }
@Test
public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() throws Exception
{
// ensure that if we don't know how to deserialize a task it won't explode the lockbox
// (or anything else that uses taskStorage.getActiveTasks() and doesn't expect null which is most things)
final TestDerbyConnector derbyConnector = derby.getConnector();
ObjectMapper loadedMapper = new DefaultObjectMapper().registerModule(new TheModule());
TaskStorage loadedTaskStorage = new MetadataTaskStorage(
derbyConnector,
new TaskStorageConfig(null),
new DerbyMetadataStorageActionHandlerFactory(
derbyConnector,
derby.metadataTablesConfigSupplier().get(),
loadedMapper
)
);
TaskLockbox theBox = new TaskLockbox(taskStorage);
TaskLockbox loadedBox = new TaskLockbox(loadedTaskStorage);
Task aTask = NoopTask.create();
taskStorage.insert(aTask, TaskStatus.running(aTask.getId()));
theBox.add(aTask);
loadedBox.add(aTask);
Task theTask = new MyModuleIsntLoadedTask("1", "yey", null, "foo");
loadedTaskStorage.insert(theTask, TaskStatus.running(theTask.getId()));
theBox.add(theTask);
loadedBox.add(theTask);
List<Task> tasks = taskStorage.getActiveTasks();
List<Task> tasksFromLoaded = loadedTaskStorage.getActiveTasks();
theBox.syncFromStorage();
loadedBox.syncFromStorage();
Assert.assertEquals(1, tasks.size());
Assert.assertEquals(2, tasksFromLoaded.size());
}
@Test @Test
public void testRevokedLockSyncFromStorage() throws EntryExistsException public void testRevokedLockSyncFromStorage() throws EntryExistsException
{ {
@ -648,4 +693,55 @@ public class TaskLockboxTest
return super.isRevoked(); return super.isRevoked();
} }
} }
private static String TASK_NAME = "myModuleIsntLoadedTask";
private static class TheModule extends SimpleModule
{
public TheModule()
{
registerSubtypes(new NamedType(MyModuleIsntLoadedTask.class, TASK_NAME));
}
}
private static class MyModuleIsntLoadedTask extends AbstractTask
{
private String someProp;
@JsonCreator
protected MyModuleIsntLoadedTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("someProp") String someProp
)
{
super(id, dataSource, context);
this.someProp = someProp;
}
@JsonProperty
public String getSomeProp()
{
return someProp;
}
@Override
public String getType()
{
return TASK_NAME;
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
return true;
}
@Override
public TaskStatus run(TaskToolbox toolbox)
{
return TaskStatus.failure("how?");
}
}
} }