Optimize supervisor history retrieval for specific id (#11807)

Optimization. Fetch from the metadata store only the relevant history items for the requested supervisor id.
This commit is contained in:
David Bar 2021-10-19 11:38:25 +03:00 committed by GitHub
parent 9c15f938fd
commit 7d4841471f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 122 additions and 30 deletions

View File

@ -166,6 +166,11 @@ public class SupervisorManager
log.info("SupervisorManager stopped."); log.info("SupervisorManager stopped.");
} }
public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id)
{
return metadataSupervisorManager.getAllForId(id);
}
public Map<String, List<VersionedSupervisorSpec>> getSupervisorHistory() public Map<String, List<VersionedSupervisorSpec>> getSupervisorHistory()
{ {
return metadataSupervisorManager.getAll(); return metadataSupervisorManager.getAll();

View File

@ -393,9 +393,8 @@ public class SupervisorResource
{ {
return asLeaderWithSupervisorManager( return asLeaderWithSupervisorManager(
manager -> { manager -> {
Map<String, List<VersionedSupervisorSpec>> supervisorHistory = manager.getSupervisorHistory(); List<VersionedSupervisorSpec> historyForId = manager.getSupervisorHistoryForId(id);
Iterable<VersionedSupervisorSpec> historyForId = supervisorHistory.get(id); if (!historyForId.isEmpty()) {
if (historyForId != null) {
final List<VersionedSupervisorSpec> authorizedHistoryForId = final List<VersionedSupervisorSpec> authorizedHistoryForId =
Lists.newArrayList( Lists.newArrayList(
AuthorizationUtils.filterAuthorizedResources( AuthorizationUtils.filterAuthorizedResources(

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord.supervisor; package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
@ -190,6 +191,21 @@ public class SupervisorManagerTest extends EasyMockSupport
Assert.assertEquals(supervisorHistory, history); Assert.assertEquals(supervisorHistory, history);
} }
@Test
public void testGetSupervisorHistoryForId()
{
String id = "test-supervisor-1";
List<VersionedSupervisorSpec> supervisorHistory = ImmutableList.of();
EasyMock.expect(metadataSupervisorManager.getAllForId(id)).andReturn(supervisorHistory);
replayAll();
List<VersionedSupervisorSpec> history = manager.getSupervisorHistoryForId(id);
verifyAll();
Assert.assertEquals(supervisorHistory, history);
}
@Test @Test
public void testGetSupervisorStatus() public void testGetSupervisorStatus()
{ {

View File

@ -914,12 +914,11 @@ public class SupervisorResourceTest extends EasyMockSupport
"v2" "v2"
) )
); );
Map<String, List<VersionedSupervisorSpec>> history = new HashMap<>();
history.put("id1", versions1);
history.put("id2", versions2);
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(3); EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(3);
EasyMock.expect(supervisorManager.getSupervisorHistory()).andReturn(history).times(3); EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1")).andReturn(versions1).times(1);
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2")).andReturn(versions2).times(1);
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3")).andReturn(Collections.emptyList()).times(1);
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
@ -1011,13 +1010,12 @@ public class SupervisorResourceTest extends EasyMockSupport
"tombstone" "tombstone"
) )
); );
Map<String, List<VersionedSupervisorSpec>> history = new HashMap<>();
history.put("id1", versions1);
history.put("id2", versions2);
history.put("id3", versions3);
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(4); EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(4);
EasyMock.expect(supervisorManager.getSupervisorHistory()).andReturn(history).times(4); EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1")).andReturn(versions1).times(1);
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2")).andReturn(versions2).times(1);
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3")).andReturn(versions3).times(1);
EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4")).andReturn(Collections.emptyList()).times(1);
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(

View File

@ -33,6 +33,8 @@ public interface MetadataSupervisorManager
Map<String, List<VersionedSupervisorSpec>> getAll(); Map<String, List<VersionedSupervisorSpec>> getAll();
List<VersionedSupervisorSpec> getAllForId(String id);
/** /**
* Return latest supervisors (both active and terminated) * Return latest supervisors (both active and terminated)
* *

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
@ -133,26 +134,9 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
public Pair<String, VersionedSupervisorSpec> map(int index, ResultSet r, StatementContext ctx) public Pair<String, VersionedSupervisorSpec> map(int index, ResultSet r, StatementContext ctx)
throws SQLException throws SQLException
{ {
SupervisorSpec payload;
try {
payload = jsonMapper.readValue(
r.getBytes("payload"),
new TypeReference<SupervisorSpec>()
{
}
);
}
catch (JsonParseException | JsonMappingException e) {
log.warn("Failed to deserialize payload for spec_id[%s]", r.getString("spec_id"));
payload = null;
}
catch (IOException e) {
throw new RuntimeException(e);
}
return Pair.of( return Pair.of(
r.getString("spec_id"), r.getString("spec_id"),
new VersionedSupervisorSpec(payload, r.getString("created_date")) createVersionSupervisorSpecFromResponse(r)
); );
} }
} }
@ -185,6 +169,60 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
); );
} }
@Override
public List<VersionedSupervisorSpec> getAllForId(String id)
{
return ImmutableList.copyOf(
dbi.withHandle(
new HandleCallback<List<VersionedSupervisorSpec>>()
{
@Override
public List<VersionedSupervisorSpec> withHandle(Handle handle)
{
return handle.createQuery(
StringUtils.format(
"SELECT id, spec_id, created_date, payload FROM %1$s WHERE spec_id = :spec_id ORDER BY id DESC",
getSupervisorsTable()
)
).bind("spec_id", id
).map(
new ResultSetMapper<VersionedSupervisorSpec>()
{
@Override
public VersionedSupervisorSpec map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
return createVersionSupervisorSpecFromResponse(r);
}
}
).list();
}
}
)
);
}
private VersionedSupervisorSpec createVersionSupervisorSpecFromResponse(ResultSet r) throws SQLException
{
SupervisorSpec payload;
try {
payload = jsonMapper.readValue(
r.getBytes("payload"),
new TypeReference<SupervisorSpec>()
{
}
);
}
catch (JsonParseException | JsonMappingException e) {
log.warn("Failed to deserialize payload for spec_id[%s]", r.getString("spec_id"));
payload = null;
}
catch (IOException e) {
throw new RuntimeException(e);
}
return new VersionedSupervisorSpec(payload, r.getString("created_date"));
}
@Override @Override
public Map<String, SupervisorSpec> getLatest() public Map<String, SupervisorSpec> getLatest()
{ {

View File

@ -218,6 +218,40 @@ public class SQLMetadataSupervisorManagerTest
Assert.assertEquals(data2rev2, ((TestSupervisorSpec) latestSpecs.get(supervisor2)).getData()); Assert.assertEquals(data2rev2, ((TestSupervisorSpec) latestSpecs.get(supervisor2)).getData());
} }
@Test
public void testInsertAndGetForId()
{
final String supervisor1 = "test-supervisor-1";
final String supervisor2 = "test-supervisor-2";
final Map<String, String> data1rev1 = ImmutableMap.of("key1-1", "value1-1-1", "key1-2", "value1-2-1");
final Map<String, String> data1rev2 = ImmutableMap.of("key1-1", "value1-1-2", "key1-2", "value1-2-2");
final Map<String, String> data1rev3 = ImmutableMap.of("key1-1", "value1-1-3", "key1-2", "value1-2-3");
final Map<String, String> data2rev1 = ImmutableMap.of("key2-1", "value2-1-1", "key2-2", "value2-2-1");
final Map<String, String> data2rev2 = ImmutableMap.of("key2-3", "value2-3-2", "key2-4", "value2-4-2");
Assert.assertTrue(supervisorManager.getAllForId(supervisor1).isEmpty());
Assert.assertTrue(supervisorManager.getAllForId(supervisor2).isEmpty());
// add 2 supervisors, with revisions
supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev1));
supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev2));
supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev3));
supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, data2rev1));
supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, data2rev2));
List<VersionedSupervisorSpec> supervisor1Specs = supervisorManager.getAllForId(supervisor1);
List<VersionedSupervisorSpec> supervisor2Specs = supervisorManager.getAllForId(supervisor2);
Assert.assertEquals(3, supervisor1Specs.size());
Assert.assertEquals(2, supervisor2Specs.size());
// make sure getAll() returns each spec in descending order
Assert.assertEquals(data1rev3, ((TestSupervisorSpec) supervisor1Specs.get(0).getSpec()).getData());
Assert.assertEquals(data1rev2, ((TestSupervisorSpec) supervisor1Specs.get(1).getSpec()).getData());
Assert.assertEquals(data1rev1, ((TestSupervisorSpec) supervisor1Specs.get(2).getSpec()).getData());
Assert.assertEquals(data2rev2, ((TestSupervisorSpec) supervisor2Specs.get(0).getSpec()).getData());
Assert.assertEquals(data2rev1, ((TestSupervisorSpec) supervisor2Specs.get(1).getSpec()).getData());
}
@Test @Test
public void testSkipDeserializingBadSpecs() public void testSkipDeserializingBadSpecs()
{ {