diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index b638fcfbd8b..04e76dfccd8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -166,6 +166,11 @@ public class SupervisorManager log.info("SupervisorManager stopped."); } + public List getSupervisorHistoryForId(String id) + { + return metadataSupervisorManager.getAllForId(id); + } + public Map> getSupervisorHistory() { return metadataSupervisorManager.getAll(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 91046661e5d..26baed3feab 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -393,9 +393,8 @@ public class SupervisorResource { return asLeaderWithSupervisorManager( manager -> { - Map> supervisorHistory = manager.getSupervisorHistory(); - Iterable historyForId = supervisorHistory.get(id); - if (historyForId != null) { + List historyForId = manager.getSupervisorHistoryForId(id); + if (!historyForId.isEmpty()) { final List authorizedHistoryForId = Lists.newArrayList( AuthorizationUtils.filterAuthorizedResources( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 6b93f84d162..498b6cfa4c1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.overlord.supervisor; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.java.util.common.DateTimes; @@ -190,6 +191,21 @@ public class SupervisorManagerTest extends EasyMockSupport Assert.assertEquals(supervisorHistory, history); } + @Test + public void testGetSupervisorHistoryForId() + { + String id = "test-supervisor-1"; + List supervisorHistory = ImmutableList.of(); + + EasyMock.expect(metadataSupervisorManager.getAllForId(id)).andReturn(supervisorHistory); + replayAll(); + + List history = manager.getSupervisorHistoryForId(id); + verifyAll(); + + Assert.assertEquals(supervisorHistory, history); + } + @Test public void testGetSupervisorStatus() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 8c77ea0c97f..f3d794aa657 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -914,12 +914,11 @@ public class SupervisorResourceTest extends EasyMockSupport "v2" ) ); - Map> history = new HashMap<>(); - history.put("id1", versions1); - history.put("id2", versions2); EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(3); - EasyMock.expect(supervisorManager.getSupervisorHistory()).andReturn(history).times(3); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1")).andReturn(versions1).times(1); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2")).andReturn(versions2).times(1); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3")).andReturn(Collections.emptyList()).times(1); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( @@ -1011,13 +1010,12 @@ public class SupervisorResourceTest extends EasyMockSupport "tombstone" ) ); - Map> history = new HashMap<>(); - history.put("id1", versions1); - history.put("id2", versions2); - history.put("id3", versions3); EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(4); - EasyMock.expect(supervisorManager.getSupervisorHistory()).andReturn(history).times(4); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1")).andReturn(versions1).times(1); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2")).andReturn(versions2).times(1); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3")).andReturn(versions3).times(1); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4")).andReturn(Collections.emptyList()).times(1); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java index 9eb254ff52e..70b25d95275 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java @@ -33,6 +33,8 @@ public interface MetadataSupervisorManager Map> getAll(); + List getAllForId(String id); + /** * Return latest supervisors (both active and terminated) * diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java index 0480c451f9f..0c6904791d8 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import org.apache.druid.guice.ManageLifecycle; @@ -133,26 +134,9 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager public Pair map(int index, ResultSet r, StatementContext ctx) throws SQLException { - SupervisorSpec payload; - try { - payload = jsonMapper.readValue( - r.getBytes("payload"), - new TypeReference() - { - } - ); - } - catch (JsonParseException | JsonMappingException e) { - log.warn("Failed to deserialize payload for spec_id[%s]", r.getString("spec_id")); - payload = null; - } - catch (IOException e) { - throw new RuntimeException(e); - } - return Pair.of( r.getString("spec_id"), - new VersionedSupervisorSpec(payload, r.getString("created_date")) + createVersionSupervisorSpecFromResponse(r) ); } } @@ -185,6 +169,60 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager ); } + @Override + public List getAllForId(String id) + { + return ImmutableList.copyOf( + dbi.withHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) + { + return handle.createQuery( + StringUtils.format( + "SELECT id, spec_id, created_date, payload FROM %1$s WHERE spec_id = :spec_id ORDER BY id DESC", + getSupervisorsTable() + ) + ).bind("spec_id", id + ).map( + new ResultSetMapper() + { + @Override + public VersionedSupervisorSpec map(int index, ResultSet r, StatementContext ctx) + throws SQLException + { + return createVersionSupervisorSpecFromResponse(r); + } + } + ).list(); + } + } + ) + ); + } + + private VersionedSupervisorSpec createVersionSupervisorSpecFromResponse(ResultSet r) throws SQLException + { + SupervisorSpec payload; + try { + payload = jsonMapper.readValue( + r.getBytes("payload"), + new TypeReference() + { + } + ); + } + catch (JsonParseException | JsonMappingException e) { + log.warn("Failed to deserialize payload for spec_id[%s]", r.getString("spec_id")); + payload = null; + } + catch (IOException e) { + throw new RuntimeException(e); + } + return new VersionedSupervisorSpec(payload, r.getString("created_date")); + } + @Override public Map getLatest() { diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index 9547387bbbf..7ea26960637 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -218,6 +218,40 @@ public class SQLMetadataSupervisorManagerTest Assert.assertEquals(data2rev2, ((TestSupervisorSpec) latestSpecs.get(supervisor2)).getData()); } + @Test + public void testInsertAndGetForId() + { + final String supervisor1 = "test-supervisor-1"; + final String supervisor2 = "test-supervisor-2"; + final Map data1rev1 = ImmutableMap.of("key1-1", "value1-1-1", "key1-2", "value1-2-1"); + final Map data1rev2 = ImmutableMap.of("key1-1", "value1-1-2", "key1-2", "value1-2-2"); + final Map data1rev3 = ImmutableMap.of("key1-1", "value1-1-3", "key1-2", "value1-2-3"); + final Map data2rev1 = ImmutableMap.of("key2-1", "value2-1-1", "key2-2", "value2-2-1"); + final Map data2rev2 = ImmutableMap.of("key2-3", "value2-3-2", "key2-4", "value2-4-2"); + + Assert.assertTrue(supervisorManager.getAllForId(supervisor1).isEmpty()); + Assert.assertTrue(supervisorManager.getAllForId(supervisor2).isEmpty()); + + // add 2 supervisors, with revisions + supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev1)); + supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev2)); + supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev3)); + supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, data2rev1)); + supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, data2rev2)); + + List supervisor1Specs = supervisorManager.getAllForId(supervisor1); + List supervisor2Specs = supervisorManager.getAllForId(supervisor2); + + Assert.assertEquals(3, supervisor1Specs.size()); + Assert.assertEquals(2, supervisor2Specs.size()); + // make sure getAll() returns each spec in descending order + Assert.assertEquals(data1rev3, ((TestSupervisorSpec) supervisor1Specs.get(0).getSpec()).getData()); + Assert.assertEquals(data1rev2, ((TestSupervisorSpec) supervisor1Specs.get(1).getSpec()).getData()); + Assert.assertEquals(data1rev1, ((TestSupervisorSpec) supervisor1Specs.get(2).getSpec()).getData()); + Assert.assertEquals(data2rev2, ((TestSupervisorSpec) supervisor2Specs.get(0).getSpec()).getData()); + Assert.assertEquals(data2rev1, ((TestSupervisorSpec) supervisor2Specs.get(1).getSpec()).getData()); + } + @Test public void testSkipDeserializingBadSpecs() {