mirror of https://github.com/apache/druid.git
Ignore bad JSON entries in SQLMetadataSupervisorManager.getAll() (#7278)
This commit is contained in:
parent
1f4ad518d8
commit
e18d5d96d9
|
@ -22,19 +22,26 @@ package org.apache.druid.indexing.overlord.supervisor;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class VersionedSupervisorSpec
|
||||
{
|
||||
@Nullable
|
||||
private final SupervisorSpec spec;
|
||||
private final String version;
|
||||
|
||||
@JsonCreator
|
||||
public VersionedSupervisorSpec(@JsonProperty("spec") SupervisorSpec spec, @JsonProperty("version") String version)
|
||||
public VersionedSupervisorSpec(
|
||||
@JsonProperty("spec") @Nullable SupervisorSpec spec,
|
||||
@JsonProperty("version") String version
|
||||
)
|
||||
{
|
||||
this.spec = spec;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
public SupervisorSpec getSpec()
|
||||
{
|
||||
return spec;
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
|
||||
package org.apache.druid.metadata;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
@ -32,6 +34,7 @@ import org.apache.druid.java.util.common.DateTimes;
|
|||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.skife.jdbi.v2.FoldController;
|
||||
import org.skife.jdbi.v2.Folder3;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
|
@ -51,6 +54,8 @@ import java.util.Map;
|
|||
@ManageLifecycle
|
||||
public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
|
||||
{
|
||||
private static final Logger log = new Logger(SQLMetadataSupervisorManager.class);
|
||||
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final SQLMetadataConnector connector;
|
||||
private final Supplier<MetadataStorageTablesConfig> dbTables;
|
||||
|
@ -124,21 +129,27 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
|
|||
public Pair<String, VersionedSupervisorSpec> map(int index, ResultSet r, StatementContext ctx)
|
||||
throws SQLException
|
||||
{
|
||||
SupervisorSpec payload;
|
||||
try {
|
||||
SupervisorSpec payload = jsonMapper.readValue(
|
||||
payload = jsonMapper.readValue(
|
||||
r.getBytes("payload"),
|
||||
new TypeReference<SupervisorSpec>()
|
||||
{
|
||||
}
|
||||
);
|
||||
return Pair.of(
|
||||
r.getString("spec_id"),
|
||||
new VersionedSupervisorSpec(payload, r.getString("created_date"))
|
||||
);
|
||||
}
|
||||
catch (JsonParseException | JsonMappingException e) {
|
||||
log.warn("Failed to deserialize payload for spec_id[%s]", r.getString("spec_id"));
|
||||
payload = null;
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return Pair.of(
|
||||
r.getString("spec_id"),
|
||||
new VersionedSupervisorSpec(payload, r.getString("created_date"))
|
||||
);
|
||||
}
|
||||
}
|
||||
).fold(
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.metadata;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
|
||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
|
||||
import org.apache.druid.indexing.overlord.supervisor.VersionedSupervisorSpec;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
|
@ -35,6 +36,7 @@ import org.junit.Test;
|
|||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -55,6 +57,23 @@ public class SQLMetadataSupervisorManagerTest
|
|||
mapper.registerSubtypes(TestSupervisorSpec.class);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup()
|
||||
{
|
||||
connector.getDBI().withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle)
|
||||
{
|
||||
handle.createStatement(StringUtils.format("DROP TABLE %s", tablesConfig.getSupervisorTable()))
|
||||
.execute();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
|
@ -118,20 +137,58 @@ public class SQLMetadataSupervisorManagerTest
|
|||
Assert.assertEquals(data2rev2, ((TestSupervisorSpec) latestSpecs.get(supervisor2)).getData());
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup()
|
||||
@Test
|
||||
public void testSkipDeserializingBadSpecs()
|
||||
{
|
||||
connector.getDBI().withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle)
|
||||
{
|
||||
handle.createStatement(StringUtils.format("DROP TABLE %s", tablesConfig.getSupervisorTable()))
|
||||
.execute();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
final String supervisor1 = "test-supervisor-1";
|
||||
final String supervisor2 = "test-supervisor-2";
|
||||
final Map<String, String> data1rev1 = ImmutableMap.of("key1-1", "value1-1-1", "key1-2", "value1-2-1");
|
||||
final Map<String, String> data1rev2 = ImmutableMap.of("key1-1", "value1-1-2", "key1-2", "value1-2-2");
|
||||
|
||||
supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev1));
|
||||
supervisorManager.insert(supervisor2, new BadSupervisorSpec(supervisor2, supervisor2));
|
||||
supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev2));
|
||||
|
||||
final Map<String, List<VersionedSupervisorSpec>> allSpecs = supervisorManager.getAll();
|
||||
|
||||
Assert.assertEquals(2, allSpecs.size());
|
||||
List<VersionedSupervisorSpec> specs = allSpecs.get(supervisor1);
|
||||
Assert.assertEquals(2, specs.size());
|
||||
Assert.assertEquals(new TestSupervisorSpec(supervisor1, data1rev2), specs.get(0).getSpec());
|
||||
Assert.assertEquals(new TestSupervisorSpec(supervisor1, data1rev1), specs.get(1).getSpec());
|
||||
|
||||
specs = allSpecs.get(supervisor2);
|
||||
Assert.assertEquals(1, specs.size());
|
||||
Assert.assertNull(specs.get(0).getSpec());
|
||||
}
|
||||
|
||||
private static class BadSupervisorSpec implements SupervisorSpec
|
||||
{
|
||||
private final String id;
|
||||
private final String dataSource;
|
||||
|
||||
private BadSupervisorSpec(String id, String dataSource)
|
||||
{
|
||||
this.id = id;
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId()
|
||||
{
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supervisor createSupervisor()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getDataSources()
|
||||
{
|
||||
return Collections.singletonList(dataSource);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,11 +25,12 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor;
|
|||
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class TestSupervisorSpec implements SupervisorSpec
|
||||
{
|
||||
private String id;
|
||||
private Object data;
|
||||
private final String id;
|
||||
private final Object data;
|
||||
|
||||
@JsonCreator
|
||||
public TestSupervisorSpec(@JsonProperty("id") String id, @JsonProperty("data") Object data)
|
||||
|
@ -62,4 +63,33 @@ public class TestSupervisorSpec implements SupervisorSpec
|
|||
{
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
TestSupervisorSpec that = (TestSupervisorSpec) o;
|
||||
return Objects.equals(id, that.id) &&
|
||||
Objects.equals(data, that.data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(id, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "TestSupervisorSpec{" +
|
||||
"id='" + id + '\'' +
|
||||
", data=" + data +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue