From 9a9038c7aeabe42724015fa240f13c6ca1aca0e2 Mon Sep 17 00:00:00 2001 From: TSFenwick Date: Thu, 27 Jul 2023 15:34:44 -0700 Subject: [PATCH] Speed up kill tasks by deleting segments in batch (#14131) * allow for batched delete of segments instead of deleting segment data one by one create new batchdelete method in datasegment killer that has default functionality of iterating through all segments and calling delete on them. This will enable a slow rollout of other deepstorage implementations to move to a batched delete on their own time * cleanup batchdelete segments * batch delete with the omni data deleter cleaned up code just need to add tests and docs for this functionality * update java doc to explain how it will try to use batch if function is overwritten * rename killBatch to kill add unit tests * add omniDataSegmentKillerTest for deleting multiple segments at a time. fix checkstyle * explain test peculiarity better * clean up batch kill in s3. * remove unused return value. cleanup comments and fix checkstyle * default to batch delete. more specific java docs. list segments that couldn't be deleted if there was a client error or server error * simplify error handling * add tests where an exception is thrown when killing multiple s3 segments * add test for failing to delete two calls with the s3 client * fix javadoc for kill(List segments) clean up tests remove feature flag * fix typo in javadocs * fix test failure * fix checkstyle and improve tests * fix intellij inspections issues * address comments, make delete multiple segments not assume same bucket * fix test errors * better grammar and punctuation. fix test. and better logging for exception * remove unused code * avoid extra arraylist instantiation * fix broken test * fix broken test * fix tests to use assert.throws --- .../druid/storage/s3/S3DataSegmentKiller.java | 119 ++++++++++- .../storage/s3/S3DataSegmentKillerTest.java | 192 ++++++++++++++++++ .../common/task/KillUnusedSegmentsTask.java | 4 +- .../segment/loading/DataSegmentKiller.java | 24 +++ .../loading/OmniDataSegmentKiller.java | 18 ++ .../loading/OmniDataSegmentKillerTest.java | 45 ++++ 6 files changed, 396 insertions(+), 6 deletions(-) diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java index d06583630d3..cfd8c47ceac 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java @@ -20,8 +20,11 @@ package org.apache.druid.storage.s3; import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.google.common.base.Predicates; import com.google.common.base.Supplier; +import com.google.common.collect.Lists; import com.google.inject.Inject; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.MapUtils; @@ -31,7 +34,11 @@ import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.timeline.DataSegment; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * @@ -40,12 +47,15 @@ public class S3DataSegmentKiller implements DataSegmentKiller { private static final Logger log = new Logger(S3DataSegmentKiller.class); + // AWS has max limit of 1000 objects that can be requested to be deleted at a time. + private static final int MAX_MULTI_OBJECT_DELETE_SIZE = 1000; + /** * Any implementation of DataSegmentKiller is initialized when an ingestion job starts if the extension is loaded, * even when the implementation of DataSegmentKiller is not used. As a result, if we have a s3 client instead * of a supplier of it, it can cause unnecessary config validation for s3 even when it's not used at all. * To perform the config validation only when it is actually used, we use a supplier. - * + *

* See OmniDataSegmentKiller for how DataSegmentKillers are initialized. */ private final Supplier s3ClientSupplier; @@ -64,13 +74,116 @@ public class S3DataSegmentKiller implements DataSegmentKiller this.inputDataConfig = inputDataConfig; } + @Override + public void kill(List segments) throws SegmentLoadingException + { + if (segments.isEmpty()) { + return; + } + if (segments.size() == 1) { + kill(segments.get(0)); + return; + } + + // create a map of bucket to keys to delete + Map> bucketToKeysToDelete = new HashMap<>(); + for (DataSegment segment : segments) { + String s3Bucket = MapUtils.getString(segment.getLoadSpec(), S3DataSegmentPuller.BUCKET); + String path = MapUtils.getString(segment.getLoadSpec(), S3DataSegmentPuller.KEY); + List keysToDelete = bucketToKeysToDelete.computeIfAbsent( + s3Bucket, + k -> new ArrayList<>() + ); + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path)); + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path))); + } + + final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get(); + boolean shouldThrowException = false; + for (Map.Entry> bucketToKeys : bucketToKeysToDelete.entrySet()) { + String s3Bucket = bucketToKeys.getKey(); + List keysToDelete = bucketToKeys.getValue(); + boolean hadException = deleteKeysForBucket(s3Client, s3Bucket, keysToDelete); + if (hadException) { + shouldThrowException = true; + } + } + if (shouldThrowException) { + // exception error message gets cutoff without providing any details. look at the logs for more details. + // this was a shortcut to handle the many different ways there could potentially be failures and handle them + // reasonably + throw new SegmentLoadingException( + "Couldn't delete segments from S3. See the task logs for more details." + ); + } + } + + /** + * Delete all keys in a bucket from s3 + * + * @param s3Client client used to communicate with s3 + * @param s3Bucket the bucket where the keys exist + * @param keysToDelete the keys to delete + * @return a boolean value of true if there was an issue deleting one or many keys, a boolean value of false if + * succesful + */ + private boolean deleteKeysForBucket( + ServerSideEncryptingAmazonS3 s3Client, + String s3Bucket, + List keysToDelete + ) + { + boolean hadException = false; + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(s3Bucket); + deleteObjectsRequest.setQuiet(true); + List> keysChunks = Lists.partition( + keysToDelete, + MAX_MULTI_OBJECT_DELETE_SIZE + ); + for (List chunkOfKeys : keysChunks) { + List keysToDeleteStrings = chunkOfKeys.stream().map( + DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList()); + try { + deleteObjectsRequest.setKeys(chunkOfKeys); + log.info( + "Removing from bucket: [%s] the following index files: [%s] from s3!", + s3Bucket, + keysToDeleteStrings + ); + s3Client.deleteObjects(deleteObjectsRequest); + } + catch (MultiObjectDeleteException e) { + hadException = true; + Map> errorToKeys = new HashMap<>(); + for (MultiObjectDeleteException.DeleteError error : e.getErrors()) { + errorToKeys.computeIfAbsent(error.getMessage(), k -> new ArrayList<>()).add(error.getKey()); + } + errorToKeys.forEach((key, value) -> log.error( + "Unable to delete from bucket [%s], the following keys [%s], because [%s]", + s3Bucket, + String.join(", ", value), + key + )); + } + catch (AmazonServiceException e) { + hadException = true; + log.noStackTrace().warn(e, + "Unable to delete from bucket [%s], the following keys [%s]", + s3Bucket, + chunkOfKeys.stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.joining(", ")) + ); + } + } + return hadException; + } + @Override public void kill(DataSegment segment) throws SegmentLoadingException { try { Map loadSpec = segment.getLoadSpec(); - String s3Bucket = MapUtils.getString(loadSpec, "bucket"); - String s3Path = MapUtils.getString(loadSpec, "key"); + String s3Bucket = MapUtils.getString(loadSpec, S3DataSegmentPuller.BUCKET); + String s3Path = MapUtils.getString(loadSpec, S3DataSegmentPuller.KEY); String s3DescriptorPath = DataSegmentKiller.descriptorPath(s3Path); final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get(); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java index f5ccacdf1f1..a705771a6b0 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java @@ -19,14 +19,20 @@ package org.apache.druid.storage.s3; +import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -42,7 +48,10 @@ import java.net.URI; public class S3DataSegmentKillerTest extends EasyMockSupport { private static final String KEY_1 = "key1"; + private static final String KEY_1_PATH = KEY_1 + "/"; + private static final String KEY_1_DESCRIPTOR_PATH = KEY_1_PATH + "descriptor.json"; private static final String KEY_2 = "key2"; + private static final String KEY_2_PATH = KEY_2 + "/"; private static final String TEST_BUCKET = "test_bucket"; private static final String TEST_PREFIX = "test_prefix"; private static final URI PREFIX_URI = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, TEST_PREFIX)); @@ -52,6 +61,30 @@ public class S3DataSegmentKillerTest extends EasyMockSupport private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException()); private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException()); + private static final DataSegment DATA_SEGMENT_1 = new DataSegment( + "test", + Intervals.of("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("bucket", TEST_BUCKET, "key", KEY_1_PATH), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ); + + private static final DataSegment DATA_SEGMENT_2 = new DataSegment( + "test", + Intervals.of("2015-04-13/2015-04-14"), + "1", + ImmutableMap.of("bucket", TEST_BUCKET, "key", KEY_2_PATH), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ); + @Mock private ServerSideEncryptingAmazonS3 s3Client; @Mock @@ -213,4 +246,163 @@ public class S3DataSegmentKillerTest extends EasyMockSupport Assert.assertTrue(ioExceptionThrown); EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig); } + + @Test + public void test_kill_singleSegment_doesntexist_passes() throws SegmentLoadingException + { + EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_PATH)).andReturn(false); + EasyMock.expectLastCall().once(); + EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_DESCRIPTOR_PATH)).andReturn(false); + EasyMock.expectLastCall().once(); + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + + segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig); + segmentKiller.kill(DATA_SEGMENT_1); + } + + @Test + public void test_kill_singleSegment_exists_passes() throws SegmentLoadingException + { + EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_PATH)).andReturn(true); + EasyMock.expectLastCall().once(); + + s3Client.deleteObject(TEST_BUCKET, KEY_1_PATH); + EasyMock.expectLastCall().andVoid(); + + EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_DESCRIPTOR_PATH)).andReturn(true); + EasyMock.expectLastCall().once(); + + s3Client.deleteObject(TEST_BUCKET, KEY_1_DESCRIPTOR_PATH); + EasyMock.expectLastCall().andVoid(); + + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + + segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig); + segmentKiller.kill(DATA_SEGMENT_1); + } + + @Test + public void test_kill_listOfOneSegment() throws SegmentLoadingException + { + EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_PATH)).andReturn(true); + EasyMock.expectLastCall().once(); + + s3Client.deleteObject(TEST_BUCKET, KEY_1_PATH); + EasyMock.expectLastCall().andVoid(); + + EasyMock.expect(s3Client.doesObjectExist(TEST_BUCKET, KEY_1_DESCRIPTOR_PATH)).andReturn(true); + EasyMock.expectLastCall().once(); + + s3Client.deleteObject(TEST_BUCKET, KEY_1_DESCRIPTOR_PATH); + EasyMock.expectLastCall().andVoid(); + + + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig); + segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1)); + } + + @Test + public void test_kill_listOfNoSegments() throws SegmentLoadingException + { + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig); + segmentKiller.kill(ImmutableList.of()); + // has an assertion error if there is an unexpected method call on a mock. Do nothing because we expect the kill + // method to not interact with mocks + } + + @Test + public void test_kill_listOfSegments() throws SegmentLoadingException + { + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET); + deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_1_PATH); + // struggled with the idea of making it match on equaling this + s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class)); + EasyMock.expectLastCall().andVoid().times(2); + + + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig); + segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_1)); + } + + @Test + public void test_kill_listOfSegments_multiDeleteExceptionIsThrown() + { + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET); + deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_2_PATH); + // struggled with the idea of making it match on equaling this + s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class)); + MultiObjectDeleteException.DeleteError deleteError = new MultiObjectDeleteException.DeleteError(); + deleteError.setKey(KEY_1_PATH); + MultiObjectDeleteException multiObjectDeleteException = new MultiObjectDeleteException( + ImmutableList.of(deleteError), + ImmutableList.of()); + EasyMock.expectLastCall().andThrow(multiObjectDeleteException).once(); + + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig); + + SegmentLoadingException thrown = Assert.assertThrows( + SegmentLoadingException.class, + () -> segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_2)) + ); + Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage()); + } + + @Test + public void test_kill_listOfSegments_multiDeleteExceptionIsThrownMultipleTimes() + { + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET); + deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_2_PATH); + // struggled with the idea of making it match on equaling this + s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class)); + MultiObjectDeleteException.DeleteError deleteError = new MultiObjectDeleteException.DeleteError(); + deleteError.setKey(KEY_1_PATH); + MultiObjectDeleteException multiObjectDeleteException = new MultiObjectDeleteException( + ImmutableList.of(deleteError), + ImmutableList.of()); + EasyMock.expectLastCall().andThrow(multiObjectDeleteException).once(); + MultiObjectDeleteException.DeleteError deleteError2 = new MultiObjectDeleteException.DeleteError(); + deleteError2.setKey(KEY_2_PATH); + MultiObjectDeleteException multiObjectDeleteException2 = new MultiObjectDeleteException( + ImmutableList.of(deleteError2), + ImmutableList.of()); + EasyMock.expectLastCall().andThrow(multiObjectDeleteException2).once(); + + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig); + ImmutableList.Builder builder = ImmutableList.builder(); + // limit is 1000 per chunk, but we attempt to delete 2 objects per key so this will be 1002 keys so it will make 2 + // calls via the s3client to delete all these objects + for (int ii = 0; ii < 501; ii++) { + builder.add(DATA_SEGMENT_1); + } + SegmentLoadingException thrown = Assert.assertThrows( + SegmentLoadingException.class, + () -> segmentKiller.kill(builder.build()) + ); + + Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage()); + } + + @Test + public void test_kill_listOfSegments_amazonServiceExceptionExceptionIsThrown() + { + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET); + deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_2_PATH); + // struggled with the idea of making it match on equaling this + s3Client.deleteObjects(EasyMock.anyObject(DeleteObjectsRequest.class)); + EasyMock.expectLastCall().andThrow(new AmazonServiceException("")).once(); + + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig); + + SegmentLoadingException thrown = Assert.assertThrows( + SegmentLoadingException.class, + () -> segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_2)) + ); + Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage()); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 29cdea6c73a..0dbceaa7e78 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -129,9 +129,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask // Kill segments toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); - for (DataSegment segment : unusedSegments) { - toolbox.getDataSegmentKiller().kill(segment); - } + toolbox.getDataSegmentKiller().kill(unusedSegments); return TaskStatus.success(getId()); } diff --git a/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java b/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java index 6237a2e06db..cca09190481 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; import java.io.IOException; +import java.util.List; /** * DataSegmentKiller knows how to kill segments from the Druid system. @@ -54,6 +55,29 @@ public interface DataSegmentKiller */ void kill(DataSegment segment) throws SegmentLoadingException; + /** + * Kills a list of segments from deep storage. The default implementation calls kill on the segments in a loop. + * Implementers of this interface can leverage batch / bulk deletes to be more efficient. It is preferable to attempt + * to delete all segments even if there is an issue with deleting a single one. This is up to implementers to + * implement as putting a try catch around the default kill via iteration can be problematic if the client of the deep + * storage is unable to authenticate itself and segment loading exception doesn't encode enough information in it to \ + * understand why it failed. + *

+ * If a segment or segments does not exist in deep storage, the method should not throw an exception. + *

+ * This version of kill must **NOT** require additional permissions on the deep storage beyond what + * {@link #kill(DataSegment)} requires. + * @param segments The list of segments to kill. + * @throws SegmentLoadingException If there is an exception during deletion such as a segment in the list could not be + * completely removed. + */ + default void kill(List segments) throws SegmentLoadingException + { + for (DataSegment segment : segments) { + kill(segment); + } + } + /** * A more stoic killer who doesn't throw a tantrum if things get messy. Use when killing segments for best-effort * cleanup. diff --git a/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java b/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java index c9e7a34d331..b1e26d72fdc 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java +++ b/server/src/main/java/org/apache/druid/segment/loading/OmniDataSegmentKiller.java @@ -29,7 +29,9 @@ import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -51,6 +53,22 @@ public class OmniDataSegmentKiller implements DataSegmentKiller } } + @Override + public void kill(List segments) throws SegmentLoadingException + { + Map> killersToSegments = new HashMap<>(); + for (DataSegment segment : segments) { + DataSegmentKiller dataSegmentKiller = getKiller(segment); + if (dataSegmentKiller != null) { + List segmentsList = killersToSegments.computeIfAbsent(dataSegmentKiller, x -> new ArrayList<>()); + segmentsList.add(segment); + } + } + for (Map.Entry> killerAndSegments : killersToSegments.entrySet()) { + killerAndSegments.getKey().kill(killerAndSegments.getValue()); + } + } + @Override public void kill(DataSegment segment) throws SegmentLoadingException { diff --git a/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java b/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java index 721c1ccda89..2038f3a8d7e 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/OmniDataSegmentKillerTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.google.inject.Injector; +import com.google.inject.Module; import com.google.inject.multibindings.MapBinder; import org.apache.druid.guice.Binders; import org.apache.druid.guice.GuiceInjectors; @@ -35,6 +36,12 @@ import org.junit.Test; import org.mockito.Mockito; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; public class OmniDataSegmentKillerTest { @@ -100,6 +107,19 @@ public class OmniDataSegmentKillerTest ); } + private static Injector createInjectorFromMap(@NotNull Map killerMap) + { + ImmutableList.Builder moduleListBuilder = ImmutableList.builder(); + for (Map.Entry typeToKiller : killerMap.entrySet()) { + moduleListBuilder.add(binder -> { + MapBinder mapBinder = Binders.dataSegmentKillerBinder(binder); + mapBinder.addBinding(typeToKiller.getKey()).toInstance(typeToKiller.getValue()); + }); + } + + return GuiceInjectors.makeStartupInjectorWithModules(moduleListBuilder.build()); + } + @Test public void testKillTombstone() throws Exception { @@ -119,6 +139,31 @@ public class OmniDataSegmentKillerTest segmentKiller.kill(tombstone); } + @Test + public void testKillMultipleSegmentsWithType() throws SegmentLoadingException + { + final DataSegmentKiller killerSane = Mockito.mock(DataSegmentKiller.class); + final DataSegmentKiller killerSaneTwo = Mockito.mock(DataSegmentKiller.class); + final DataSegment segment1 = Mockito.mock(DataSegment.class); + final DataSegment segment2 = Mockito.mock(DataSegment.class); + final DataSegment segment3 = Mockito.mock(DataSegment.class); + Mockito.when(segment1.isTombstone()).thenReturn(false); + Mockito.when(segment1.getLoadSpec()).thenReturn(ImmutableMap.of("type", "sane")); + Mockito.when(segment2.isTombstone()).thenReturn(false); + Mockito.when(segment2.getLoadSpec()).thenReturn(ImmutableMap.of("type", "sane")); + Mockito.when(segment3.isTombstone()).thenReturn(false); + Mockito.when(segment3.getLoadSpec()).thenReturn(ImmutableMap.of("type", "sane_2")); + + final Injector injector = createInjectorFromMap(ImmutableMap.of("sane", killerSane, "sane_2", killerSaneTwo)); + final OmniDataSegmentKiller segmentKiller = injector.getInstance(OmniDataSegmentKiller.class); + segmentKiller.kill(ImmutableList.of(segment1, segment2, segment3)); + + Mockito.verify(killerSane, Mockito.times(1)) + .kill((List) argThat(containsInAnyOrder(segment1, segment2))); + Mockito.verify(killerSaneTwo, Mockito.times(1)) + .kill((List) argThat(containsInAnyOrder(segment3))); + } + @LazySingleton private static class BadSegmentKiller implements DataSegmentKiller {