diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/VersionedSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/VersionedSupervisorSpec.java index d589d852604..51f6a7ef14d 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/VersionedSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/VersionedSupervisorSpec.java @@ -22,19 +22,26 @@ package org.apache.druid.indexing.overlord.supervisor; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; + public class VersionedSupervisorSpec { + @Nullable private final SupervisorSpec spec; private final String version; @JsonCreator - public VersionedSupervisorSpec(@JsonProperty("spec") SupervisorSpec spec, @JsonProperty("version") String version) + public VersionedSupervisorSpec( + @JsonProperty("spec") @Nullable SupervisorSpec spec, + @JsonProperty("version") String version + ) { this.spec = spec; this.version = version; } @JsonProperty + @Nullable public SupervisorSpec getSpec() { return spec; diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java index 871cb5dc14a..83fcc0505c5 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java @@ -19,7 +19,9 @@ package org.apache.druid.metadata; +import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; @@ -32,6 +34,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.logger.Logger; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -51,6 +54,8 @@ import java.util.Map; @ManageLifecycle public class SQLMetadataSupervisorManager implements MetadataSupervisorManager { + private static final Logger log = new Logger(SQLMetadataSupervisorManager.class); + private final ObjectMapper jsonMapper; private final SQLMetadataConnector connector; private final Supplier dbTables; @@ -124,21 +129,27 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager public Pair map(int index, ResultSet r, StatementContext ctx) throws SQLException { + SupervisorSpec payload; try { - SupervisorSpec payload = jsonMapper.readValue( + payload = jsonMapper.readValue( r.getBytes("payload"), new TypeReference() { } ); - return Pair.of( - r.getString("spec_id"), - new VersionedSupervisorSpec(payload, r.getString("created_date")) - ); + } + catch (JsonParseException | JsonMappingException e) { + log.warn("Failed to deserialize payload for spec_id[%s]", r.getString("spec_id")); + payload = null; } catch (IOException e) { throw new RuntimeException(e); } + + return Pair.of( + r.getString("spec_id"), + new VersionedSupervisorSpec(payload, r.getString("created_date")) + ); } } ).fold( diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index 30f35e34c04..afc5f531cd7 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -22,6 +22,7 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.VersionedSupervisorSpec; import org.apache.druid.jackson.DefaultObjectMapper; @@ -35,6 +36,7 @@ import org.junit.Test; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -55,6 +57,23 @@ public class SQLMetadataSupervisorManagerTest mapper.registerSubtypes(TestSupervisorSpec.class); } + @After + public void cleanup() + { + connector.getDBI().withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) + { + handle.createStatement(StringUtils.format("DROP TABLE %s", tablesConfig.getSupervisorTable())) + .execute(); + return null; + } + } + ); + } + @Before public void setUp() { @@ -118,20 +137,58 @@ public class SQLMetadataSupervisorManagerTest Assert.assertEquals(data2rev2, ((TestSupervisorSpec) latestSpecs.get(supervisor2)).getData()); } - @After - public void cleanup() + @Test + public void testSkipDeserializingBadSpecs() { - connector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - handle.createStatement(StringUtils.format("DROP TABLE %s", tablesConfig.getSupervisorTable())) - .execute(); - return null; - } - } - ); + final String supervisor1 = "test-supervisor-1"; + final String supervisor2 = "test-supervisor-2"; + final Map data1rev1 = ImmutableMap.of("key1-1", "value1-1-1", "key1-2", "value1-2-1"); + final Map data1rev2 = ImmutableMap.of("key1-1", "value1-1-2", "key1-2", "value1-2-2"); + + supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev1)); + supervisorManager.insert(supervisor2, new BadSupervisorSpec(supervisor2, supervisor2)); + supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev2)); + + final Map> allSpecs = supervisorManager.getAll(); + + Assert.assertEquals(2, allSpecs.size()); + List specs = allSpecs.get(supervisor1); + Assert.assertEquals(2, specs.size()); + Assert.assertEquals(new TestSupervisorSpec(supervisor1, data1rev2), specs.get(0).getSpec()); + Assert.assertEquals(new TestSupervisorSpec(supervisor1, data1rev1), specs.get(1).getSpec()); + + specs = allSpecs.get(supervisor2); + Assert.assertEquals(1, specs.size()); + Assert.assertNull(specs.get(0).getSpec()); + } + + private static class BadSupervisorSpec implements SupervisorSpec + { + private final String id; + private final String dataSource; + + private BadSupervisorSpec(String id, String dataSource) + { + this.id = id; + this.dataSource = dataSource; + } + + @Override + public String getId() + { + return id; + } + + @Override + public Supervisor createSupervisor() + { + throw new UnsupportedOperationException(); + } + + @Override + public List getDataSources() + { + return Collections.singletonList(dataSource); + } } } diff --git a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java index bb9bf46e230..19802f172dd 100644 --- a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java +++ b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java @@ -25,11 +25,12 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import java.util.List; +import java.util.Objects; public class TestSupervisorSpec implements SupervisorSpec { - private String id; - private Object data; + private final String id; + private final Object data; @JsonCreator public TestSupervisorSpec(@JsonProperty("id") String id, @JsonProperty("data") Object data) @@ -62,4 +63,33 @@ public class TestSupervisorSpec implements SupervisorSpec { return data; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestSupervisorSpec that = (TestSupervisorSpec) o; + return Objects.equals(id, that.id) && + Objects.equals(data, that.data); + } + + @Override + public int hashCode() + { + return Objects.hash(id, data); + } + + @Override + public String toString() + { + return "TestSupervisorSpec{" + + "id='" + id + '\'' + + ", data=" + data + + '}'; + } }