mirror of https://github.com/apache/druid.git
Miscellaneous cleanup in the supervisor API flow. (#17144)
Extracting a few miscellaneous non-functional changes from the batch supervisor branch: - Replace anonymous inner classes with lambda expressions in the SQL supervisor manager layer - Add explicit @Nullable annotations in DynamicConfigProviderUtils to make IDE happy - Small variable renames (copy-paste error perhaps) and fix typos - Add table name for this exception message: Delete the supervisor from the table[%s] in the database... - Prefer CollectionUtils.isEmptyOrNull() over list == null || list.size() > 0. We can change the Precondition checks to throwing DruidException separately for a batch of APIs at a time.
This commit is contained in:
parent
d1bfabbf4d
commit
83299e9882
|
@ -50,6 +50,7 @@ import org.apache.druid.server.security.ForbiddenException;
|
|||
import org.apache.druid.server.security.Resource;
|
||||
import org.apache.druid.server.security.ResourceAction;
|
||||
import org.apache.druid.server.security.ResourceType;
|
||||
import org.apache.druid.utils.CollectionUtils;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -123,7 +124,7 @@ public class SupervisorResource
|
|||
return asLeaderWithSupervisorManager(
|
||||
manager -> {
|
||||
Preconditions.checkArgument(
|
||||
spec.getDataSources() != null && spec.getDataSources().size() > 0,
|
||||
!CollectionUtils.isNullOrEmpty(spec.getDataSources()),
|
||||
"No dataSources found to perform authorization checks"
|
||||
);
|
||||
final Set<ResourceAction> resourceActions;
|
||||
|
@ -412,7 +413,7 @@ public class SupervisorResource
|
|||
public Response handoffTaskGroups(@PathParam("id") final String id, @Nonnull final HandoffTaskGroupsRequest handoffTaskGroupsRequest)
|
||||
{
|
||||
List<Integer> taskGroupIds = handoffTaskGroupsRequest.getTaskGroupIds();
|
||||
if (taskGroupIds == null || taskGroupIds.isEmpty()) {
|
||||
if (CollectionUtils.isNullOrEmpty(taskGroupIds)) {
|
||||
return Response.status(Response.Status.BAD_REQUEST)
|
||||
.entity(ImmutableMap.of("error", "List of task groups to handoff can't be empty"))
|
||||
.build();
|
||||
|
@ -533,7 +534,7 @@ public class SupervisorResource
|
|||
authorizerMapper
|
||||
)
|
||||
);
|
||||
if (authorizedHistoryForId.size() > 0) {
|
||||
if (!authorizedHistoryForId.isEmpty()) {
|
||||
return Response.ok(authorizedHistoryForId).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,13 +22,14 @@ package org.apache.druid.utils;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.druid.metadata.DynamicConfigProvider;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class DynamicConfigProviderUtils
|
||||
{
|
||||
public static Map<String, String> extraConfigAndSetStringMap(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
|
||||
public static Map<String, String> extraConfigAndSetStringMap(@Nullable Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
|
||||
{
|
||||
HashMap<String, String> newConfig = new HashMap<>();
|
||||
if (config != null) {
|
||||
|
@ -43,7 +44,7 @@ public class DynamicConfigProviderUtils
|
|||
return newConfig;
|
||||
}
|
||||
|
||||
public static Map<String, Object> extraConfigAndSetObjectMap(Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
|
||||
public static Map<String, Object> extraConfigAndSetObjectMap(@Nullable Map<String, Object> config, String dynamicConfigProviderKey, ObjectMapper mapper)
|
||||
{
|
||||
HashMap<String, Object> newConfig = new HashMap<>();
|
||||
if (config != null) {
|
||||
|
@ -58,7 +59,7 @@ public class DynamicConfigProviderUtils
|
|||
return newConfig;
|
||||
}
|
||||
|
||||
private static Map<String, String> extraConfigFromProvider(Object dynamicConfigProviderJson, ObjectMapper mapper)
|
||||
private static Map<String, String> extraConfigFromProvider(@Nullable Object dynamicConfigProviderJson, ObjectMapper mapper)
|
||||
{
|
||||
if (dynamicConfigProviderJson != null) {
|
||||
DynamicConfigProvider dynamicConfigProvider = mapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.apache.druid.metadata;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Supplier;
|
||||
|
@ -38,9 +37,7 @@ import org.apache.druid.java.util.common.StringUtils;
|
|||
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.joda.time.DateTime;
|
||||
import org.skife.jdbi.v2.FoldController;
|
||||
import org.skife.jdbi.v2.Folder3;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.IDBI;
|
||||
import org.skife.jdbi.v2.PreparedBatch;
|
||||
import org.skife.jdbi.v2.StatementContext;
|
||||
|
@ -91,24 +88,19 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
|
|||
public void insert(final String id, final SupervisorSpec spec)
|
||||
{
|
||||
dbi.withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle) throws Exception
|
||||
{
|
||||
handle.createStatement(
|
||||
StringUtils.format(
|
||||
"INSERT INTO %s (spec_id, created_date, payload) VALUES (:spec_id, :created_date, :payload)",
|
||||
getSupervisorsTable()
|
||||
)
|
||||
)
|
||||
.bind("spec_id", id)
|
||||
.bind("created_date", DateTimes.nowUtc().toString())
|
||||
.bind("payload", jsonMapper.writeValueAsBytes(spec))
|
||||
.execute();
|
||||
handle -> {
|
||||
handle.createStatement(
|
||||
StringUtils.format(
|
||||
"INSERT INTO %s (spec_id, created_date, payload) VALUES (:spec_id, :created_date, :payload)",
|
||||
getSupervisorsTable()
|
||||
)
|
||||
)
|
||||
.bind("spec_id", id)
|
||||
.bind("created_date", DateTimes.nowUtc().toString())
|
||||
.bind("payload", jsonMapper.writeValueAsBytes(spec))
|
||||
.execute();
|
||||
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
@ -118,54 +110,29 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
|
|||
{
|
||||
return ImmutableMap.copyOf(
|
||||
dbi.withHandle(
|
||||
new HandleCallback<Map<String, List<VersionedSupervisorSpec>>>()
|
||||
{
|
||||
@Override
|
||||
public Map<String, List<VersionedSupervisorSpec>> withHandle(Handle handle)
|
||||
{
|
||||
return handle.createQuery(
|
||||
StringUtils.format(
|
||||
"SELECT id, spec_id, created_date, payload FROM %1$s ORDER BY id DESC",
|
||||
getSupervisorsTable()
|
||||
)
|
||||
).map(
|
||||
new ResultSetMapper<Pair<String, VersionedSupervisorSpec>>()
|
||||
{
|
||||
@Override
|
||||
public Pair<String, VersionedSupervisorSpec> map(int index, ResultSet r, StatementContext ctx)
|
||||
throws SQLException
|
||||
{
|
||||
return Pair.of(
|
||||
r.getString("spec_id"),
|
||||
createVersionSupervisorSpecFromResponse(r)
|
||||
);
|
||||
}
|
||||
}
|
||||
).fold(
|
||||
new HashMap<>(),
|
||||
new Folder3<Map<String, List<VersionedSupervisorSpec>>, Pair<String, VersionedSupervisorSpec>>()
|
||||
{
|
||||
@Override
|
||||
public Map<String, List<VersionedSupervisorSpec>> fold(
|
||||
Map<String, List<VersionedSupervisorSpec>> retVal,
|
||||
Pair<String, VersionedSupervisorSpec> pair,
|
||||
FoldController foldController,
|
||||
StatementContext statementContext
|
||||
)
|
||||
{
|
||||
try {
|
||||
String specId = pair.lhs;
|
||||
retVal.computeIfAbsent(specId, sId -> new ArrayList<>()).add(pair.rhs);
|
||||
return retVal;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
(HandleCallback<Map<String, List<VersionedSupervisorSpec>>>) handle -> handle.createQuery(
|
||||
StringUtils.format(
|
||||
"SELECT id, spec_id, created_date, payload FROM %1$s ORDER BY id DESC",
|
||||
getSupervisorsTable()
|
||||
)
|
||||
).map(
|
||||
(index, r, ctx) -> Pair.of(
|
||||
r.getString("spec_id"),
|
||||
createVersionSupervisorSpecFromResponse(r)
|
||||
)
|
||||
).fold(
|
||||
new HashMap<>(),
|
||||
(Folder3<Map<String, List<VersionedSupervisorSpec>>, Pair<String, VersionedSupervisorSpec>>) (retVal, pair, foldController, statementContext) -> {
|
||||
try {
|
||||
String specId = pair.lhs;
|
||||
retVal.computeIfAbsent(specId, sId -> new ArrayList<>()).add(pair.rhs);
|
||||
return retVal;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -175,30 +142,15 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
|
|||
{
|
||||
return ImmutableList.copyOf(
|
||||
dbi.withHandle(
|
||||
new HandleCallback<List<VersionedSupervisorSpec>>()
|
||||
{
|
||||
@Override
|
||||
public List<VersionedSupervisorSpec> withHandle(Handle handle)
|
||||
{
|
||||
return handle.createQuery(
|
||||
StringUtils.format(
|
||||
"SELECT id, spec_id, created_date, payload FROM %1$s WHERE spec_id = :spec_id ORDER BY id DESC",
|
||||
getSupervisorsTable()
|
||||
)
|
||||
).bind("spec_id", id
|
||||
).map(
|
||||
new ResultSetMapper<VersionedSupervisorSpec>()
|
||||
{
|
||||
@Override
|
||||
public VersionedSupervisorSpec map(int index, ResultSet r, StatementContext ctx)
|
||||
throws SQLException
|
||||
{
|
||||
return createVersionSupervisorSpecFromResponse(r);
|
||||
}
|
||||
}
|
||||
).list();
|
||||
}
|
||||
}
|
||||
(HandleCallback<List<VersionedSupervisorSpec>>) handle -> handle.createQuery(
|
||||
StringUtils.format(
|
||||
"SELECT id, spec_id, created_date, payload FROM %1$s WHERE spec_id = :spec_id ORDER BY id DESC",
|
||||
getSupervisorsTable()
|
||||
)
|
||||
)
|
||||
.bind("spec_id", id)
|
||||
.map((index, r, ctx) -> createVersionSupervisorSpecFromResponse(r))
|
||||
.list()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -207,12 +159,7 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
|
|||
{
|
||||
SupervisorSpec payload;
|
||||
try {
|
||||
payload = jsonMapper.readValue(
|
||||
r.getBytes("payload"),
|
||||
new TypeReference<SupervisorSpec>()
|
||||
{
|
||||
}
|
||||
);
|
||||
payload = jsonMapper.readValue(r.getBytes("payload"), SupervisorSpec.class);
|
||||
}
|
||||
catch (JsonParseException | JsonMappingException e) {
|
||||
log.warn("Failed to deserialize payload for spec_id[%s]", r.getString("spec_id"));
|
||||
|
@ -229,74 +176,54 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
|
|||
{
|
||||
return ImmutableMap.copyOf(
|
||||
dbi.withHandle(
|
||||
new HandleCallback<Map<String, SupervisorSpec>>()
|
||||
{
|
||||
@Override
|
||||
public Map<String, SupervisorSpec> withHandle(Handle handle)
|
||||
{
|
||||
return handle.createQuery(
|
||||
StringUtils.format(
|
||||
"SELECT r.spec_id, r.payload "
|
||||
+ "FROM %1$s r "
|
||||
+ "INNER JOIN(SELECT spec_id, max(id) as id FROM %1$s GROUP BY spec_id) latest "
|
||||
+ "ON r.id = latest.id",
|
||||
getSupervisorsTable()
|
||||
)
|
||||
).map(
|
||||
new ResultSetMapper<Pair<String, SupervisorSpec>>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Pair<String, SupervisorSpec> map(int index, ResultSet r, StatementContext ctx)
|
||||
throws SQLException
|
||||
{
|
||||
try {
|
||||
return Pair.of(
|
||||
r.getString("spec_id"),
|
||||
jsonMapper.readValue(
|
||||
r.getBytes("payload"), new TypeReference<SupervisorSpec>()
|
||||
{
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
String exceptionMessage = StringUtils.format(
|
||||
"Could not map json payload to a SupervisorSpec for spec_id: [%s]."
|
||||
+ " Delete the supervisor from the database and re-submit it to the overlord.",
|
||||
r.getString("spec_id")
|
||||
);
|
||||
log.error(e, exceptionMessage);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
(HandleCallback<Map<String, SupervisorSpec>>) handle -> handle.createQuery(
|
||||
StringUtils.format(
|
||||
"SELECT r.spec_id, r.payload "
|
||||
+ "FROM %1$s r "
|
||||
+ "INNER JOIN(SELECT spec_id, max(id) as id FROM %1$s GROUP BY spec_id) latest "
|
||||
+ "ON r.id = latest.id",
|
||||
getSupervisorsTable()
|
||||
)
|
||||
).map(
|
||||
new ResultSetMapper<Pair<String, SupervisorSpec>>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public Pair<String, SupervisorSpec> map(int index, ResultSet r, StatementContext ctx)
|
||||
throws SQLException
|
||||
{
|
||||
try {
|
||||
return Pair.of(
|
||||
r.getString("spec_id"),
|
||||
jsonMapper.readValue(r.getBytes("payload"), SupervisorSpec.class)
|
||||
);
|
||||
}
|
||||
).fold(
|
||||
new HashMap<>(),
|
||||
new Folder3<Map<String, SupervisorSpec>, Pair<String, SupervisorSpec>>()
|
||||
{
|
||||
@Override
|
||||
public Map<String, SupervisorSpec> fold(
|
||||
Map<String, SupervisorSpec> retVal,
|
||||
Pair<String, SupervisorSpec> stringObjectMap,
|
||||
FoldController foldController,
|
||||
StatementContext statementContext
|
||||
)
|
||||
{
|
||||
try {
|
||||
if (null != stringObjectMap) {
|
||||
retVal.put(stringObjectMap.lhs, stringObjectMap.rhs);
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
String exceptionMessage = StringUtils.format(
|
||||
"Could not map json payload to a SupervisorSpec for spec_id: [%s]."
|
||||
+ " Delete the supervisor from the table[%s] in the database and re-submit it to the overlord.",
|
||||
r.getString("spec_id"),
|
||||
getSupervisorsTable()
|
||||
);
|
||||
log.error(e, exceptionMessage);
|
||||
return null;
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
).fold(
|
||||
new HashMap<>(),
|
||||
(Folder3<Map<String, SupervisorSpec>, Pair<String, SupervisorSpec>>) (retVal, stringObjectMap, foldController, statementContext) -> {
|
||||
try {
|
||||
if (null != stringObjectMap) {
|
||||
retVal.put(stringObjectMap.lhs, stringObjectMap.rhs);
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -304,10 +231,10 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
|
|||
@Override
|
||||
public Map<String, SupervisorSpec> getLatestActiveOnly()
|
||||
{
|
||||
Map<String, SupervisorSpec> supervisors = getLatest();
|
||||
Map<String, SupervisorSpec> activeSupervisors = new HashMap<>();
|
||||
final Map<String, SupervisorSpec> supervisors = getLatest();
|
||||
final Map<String, SupervisorSpec> activeSupervisors = new HashMap<>();
|
||||
for (Map.Entry<String, SupervisorSpec> entry : supervisors.entrySet()) {
|
||||
// Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec
|
||||
// Terminated supervisor will have its latest supervisorSpec as NoopSupervisorSpec
|
||||
// (NoopSupervisorSpec is used as a tombstone marker)
|
||||
if (!(entry.getValue() instanceof NoopSupervisorSpec)) {
|
||||
activeSupervisors.put(entry.getKey(), entry.getValue());
|
||||
|
@ -319,16 +246,16 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
|
|||
@Override
|
||||
public Map<String, SupervisorSpec> getLatestTerminatedOnly()
|
||||
{
|
||||
Map<String, SupervisorSpec> supervisors = getLatest();
|
||||
Map<String, SupervisorSpec> activeSupervisors = new HashMap<>();
|
||||
final Map<String, SupervisorSpec> supervisors = getLatest();
|
||||
final Map<String, SupervisorSpec> terminatedSupervisors = new HashMap<>();
|
||||
for (Map.Entry<String, SupervisorSpec> entry : supervisors.entrySet()) {
|
||||
// Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec
|
||||
// Terminated supervisor will have its latest supervisorSpec as NoopSupervisorSpec
|
||||
// (NoopSupervisorSpec is used as a tombstone marker)
|
||||
if (entry.getValue() instanceof NoopSupervisorSpec) {
|
||||
activeSupervisors.put(entry.getKey(), entry.getValue());
|
||||
terminatedSupervisors.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return ImmutableMap.copyOf(activeSupervisors);
|
||||
return ImmutableMap.copyOf(terminatedSupervisors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -37,8 +37,6 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -65,16 +63,9 @@ public class SQLMetadataSupervisorManagerTest
|
|||
public void cleanup()
|
||||
{
|
||||
connector.getDBI().withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle)
|
||||
{
|
||||
handle.createStatement(StringUtils.format("DROP TABLE %s", tablesConfig.getSupervisorTable()))
|
||||
.execute();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
handle -> handle.createStatement(StringUtils.format("DROP TABLE %s", tablesConfig.getSupervisorTable()))
|
||||
.execute()
|
||||
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue