From 27ab23ef448cbb291a6f26dd5a72396ce7e7fbde Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 1 Dec 2016 07:49:28 -0800 Subject: [PATCH] Don't update segment metadata if archive doesn't move anything (#3476) * Don't update segment metadata if archive doesn't move anything * Fix restore task to handle potential null values * Don't try to update empty metadata * Address review comments * Move to druid-io java-util --- .../segment/loading/DataSegmentArchiver.java | 27 ++- .../storage/s3/S3DataSegmentArchiver.java | 36 +++- .../java/io/druid/storage/s3/S3LoadSpec.java | 14 +- .../storage/s3/S3DataSegmentArchiverTest.java | 168 ++++++++++++++++++ .../indexing/common/task/ArchiveTask.java | 6 +- .../indexing/common/task/RestoreTask.java | 84 +++++---- 6 files changed, 287 insertions(+), 48 deletions(-) create mode 100644 extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentArchiverTest.java diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentArchiver.java b/api/src/main/java/io/druid/segment/loading/DataSegmentArchiver.java index 869f0591dd6..d6753296b8e 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentArchiver.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentArchiver.java @@ -21,8 +21,31 @@ package io.druid.segment.loading; import io.druid.timeline.DataSegment; +import javax.annotation.Nullable; + public interface DataSegmentArchiver { - public DataSegment archive(DataSegment segment) throws SegmentLoadingException; - public DataSegment restore(DataSegment segment) throws SegmentLoadingException; + /** + * Perform an archive task on the segment and return the resulting segment or null if there was no action needed. + * + * @param segment The source segment + * + * @return The segment after archiving or `null` if there was no archiving performed. + * + * @throws SegmentLoadingException on error + */ + @Nullable + DataSegment archive(DataSegment segment) throws SegmentLoadingException; + + /** + * Perform the restore from an archived segment and return the resulting segment or null if there was no action + * + * @param segment The source (archived) segment + * + * @return The segment after it has been unarchived + * + * @throws SegmentLoadingException on error + */ + @Nullable + DataSegment restore(DataSegment segment) throws SegmentLoadingException; } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java index cf7d9fb4d06..d7bc1b2d491 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java @@ -19,9 +19,13 @@ package io.druid.storage.s3; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import io.druid.guice.annotations.Json; import io.druid.segment.loading.DataSegmentArchiver; +import io.druid.segment.loading.LoadSpec; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -31,15 +35,18 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg { private final S3DataSegmentArchiverConfig archiveConfig; private final S3DataSegmentPusherConfig restoreConfig; + private final ObjectMapper mapper; @Inject public S3DataSegmentArchiver( - RestS3Service s3Client, - S3DataSegmentArchiverConfig archiveConfig, - S3DataSegmentPusherConfig restoreConfig + @Json ObjectMapper mapper, + RestS3Service s3Client, + S3DataSegmentArchiverConfig archiveConfig, + S3DataSegmentPusherConfig restoreConfig ) { super(s3Client, restoreConfig); + this.mapper = mapper; this.archiveConfig = archiveConfig; this.restoreConfig = restoreConfig; } @@ -50,13 +57,17 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg String targetS3Bucket = archiveConfig.getArchiveBucket(); String targetS3BaseKey = archiveConfig.getArchiveBaseKey(); - return move( + final DataSegment archived = move( segment, ImmutableMap.of( "bucket", targetS3Bucket, "baseKey", targetS3BaseKey ) ); + if (sameLoadSpec(segment, archived)) { + return null; + } + return archived; } @Override @@ -65,12 +76,27 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg String targetS3Bucket = restoreConfig.getBucket(); String targetS3BaseKey = restoreConfig.getBaseKey(); - return move( + final DataSegment restored = move( segment, ImmutableMap.of( "bucket", targetS3Bucket, "baseKey", targetS3BaseKey ) ); + + if (sameLoadSpec(segment, restored)) { + return null; + } + return restored; + } + + boolean sameLoadSpec(DataSegment s1, DataSegment s2) + { + final S3LoadSpec s1LoadSpec = (S3LoadSpec) mapper.convertValue(s1.getLoadSpec(), LoadSpec.class); + final S3LoadSpec s2LoadSpec = (S3LoadSpec) mapper.convertValue(s2.getLoadSpec(), LoadSpec.class); + return Objects.equal(s1LoadSpec.getBucket(), s2LoadSpec.getBucket()) && Objects.equal( + s1LoadSpec.getKey(), + s2LoadSpec.getKey() + ); } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java index e484fdb223c..8fead762a55 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java @@ -35,9 +35,7 @@ import java.io.File; @JsonTypeName(S3StorageDruidModule.SCHEME) public class S3LoadSpec implements LoadSpec { - @JsonProperty(S3DataSegmentPuller.BUCKET) private final String bucket; - @JsonProperty(S3DataSegmentPuller.KEY) private final String key; private final S3DataSegmentPuller puller; @@ -61,4 +59,16 @@ public class S3LoadSpec implements LoadSpec { return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size()); } + + @JsonProperty(S3DataSegmentPuller.BUCKET) + public String getBucket() + { + return bucket; + } + + @JsonProperty(S3DataSegmentPuller.KEY) + public String getKey() + { + return key; + } } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentArchiverTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentArchiverTest.java new file mode 100644 index 00000000000..570c5e1f87c --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentArchiverTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.storage.s3; + +import com.fasterxml.jackson.databind.BeanProperty; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import org.easymock.EasyMock; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Map; + +public class S3DataSegmentArchiverTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper() + .setInjectableValues(new InjectableValues() + { + @Override + public Object findInjectableValue( + Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance + ) + { + return PULLER; + } + }).registerModule(new SimpleModule("s3-archive-test-module").registerSubtypes(S3LoadSpec.class)); + private static final S3DataSegmentArchiverConfig ARCHIVER_CONFIG = new S3DataSegmentArchiverConfig() + { + @Override + public String getArchiveBucket() + { + return "archive_bucket"; + } + + @Override + public String getArchiveBaseKey() + { + return "archive_base_key"; + } + }; + private static final S3DataSegmentPusherConfig PUSHER_CONFIG = new S3DataSegmentPusherConfig(); + private static final RestS3Service S3_SERVICE = EasyMock.createStrictMock(RestS3Service.class); + private static final S3DataSegmentPuller PULLER = new S3DataSegmentPuller(S3_SERVICE); + private static final DataSegment SOURCE_SEGMENT = DataSegment + .builder() + .binaryVersion(1) + .dataSource("dataSource") + .dimensions(ImmutableList.of()) + .interval(Interval.parse("2015/2016")) + .version("version") + .loadSpec(ImmutableMap.of( + "type", + S3StorageDruidModule.SCHEME, + S3DataSegmentPuller.BUCKET, + "source_bucket", + S3DataSegmentPuller.KEY, + "source_key" + )) + .build(); + + @BeforeClass + public static void setUpStatic() + { + PUSHER_CONFIG.setBaseKey("push_base"); + PUSHER_CONFIG.setBucket("push_bucket"); + } + + @Test + public void testSimpleArchive() throws Exception + { + final DataSegment archivedSegment = SOURCE_SEGMENT + .withLoadSpec(ImmutableMap.of( + "type", + S3StorageDruidModule.SCHEME, + S3DataSegmentPuller.BUCKET, + ARCHIVER_CONFIG.getArchiveBucket(), + S3DataSegmentPuller.KEY, + ARCHIVER_CONFIG.getArchiveBaseKey() + "archived" + )); + final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG) + { + @Override + public DataSegment move(DataSegment segment, Map targetLoadSpec) throws SegmentLoadingException + { + return archivedSegment; + } + }; + Assert.assertEquals(archivedSegment, archiver.archive(SOURCE_SEGMENT)); + } + + @Test + public void testSimpleArchiveDoesntMove() throws Exception + { + final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG) + { + @Override + public DataSegment move(DataSegment segment, Map targetLoadSpec) throws SegmentLoadingException + { + return SOURCE_SEGMENT; + } + }; + Assert.assertNull(archiver.archive(SOURCE_SEGMENT)); + } + + @Test + public void testSimpleRestore() throws Exception + { + final DataSegment archivedSegment = SOURCE_SEGMENT + .withLoadSpec(ImmutableMap.of( + "type", + S3StorageDruidModule.SCHEME, + S3DataSegmentPuller.BUCKET, + ARCHIVER_CONFIG.getArchiveBucket(), + S3DataSegmentPuller.KEY, + ARCHIVER_CONFIG.getArchiveBaseKey() + "archived" + )); + final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG) + { + @Override + public DataSegment move(DataSegment segment, Map targetLoadSpec) throws SegmentLoadingException + { + return archivedSegment; + } + }; + Assert.assertEquals(archivedSegment, archiver.restore(SOURCE_SEGMENT)); + } + + @Test + public void testSimpleRestoreDoesntMove() throws Exception + { + final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG) + { + @Override + public DataSegment move(DataSegment segment, Map targetLoadSpec) throws SegmentLoadingException + { + return SOURCE_SEGMENT; + } + }; + Assert.assertNull(archiver.restore(SOURCE_SEGMENT)); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java index 316f26ba31d..ffb9ddfe536 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java @@ -97,7 +97,11 @@ public class ArchiveTask extends AbstractFixedIntervalTask // Move segments for (DataSegment segment : unusedSegments) { final DataSegment archivedSegment = toolbox.getDataSegmentArchiver().archive(segment); - toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(archivedSegment))); + if (archivedSegment != null) { + toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(archivedSegment))); + } else { + log.info("No action was taken for [%s]", segment); + } } return TaskStatus.success(getId()); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java index 0ca3a9f9adc..736273a2919 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java @@ -22,8 +22,6 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; @@ -34,6 +32,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.timeline.DataSegment; import org.joda.time.Interval; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -63,52 +62,61 @@ public class RestoreTask extends AbstractFixedIntervalTask } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception - { - // Confirm we have a lock (will throw if there isn't exactly one element) - final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + // Confirm we have a lock (will throw if there isn't exactly one element) + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); - if (!myLock.getDataSource().equals(getDataSource())) { - throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); + if (!myLock.getDataSource().equals(getDataSource())) { + throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); + } + + if (!myLock.getInterval().equals(getInterval())) { + throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval()); + } + + // List unused segments + final List unusedSegments = toolbox + .getTaskActionClient() + .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); + + // Verify none of these segments have versions > lock version + for (final DataSegment unusedSegment : unusedSegments) { + if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) { + throw new ISE( + "WTF?! Unused segment[%s] has version[%s] > task version[%s]", + unusedSegment.getIdentifier(), + unusedSegment.getVersion(), + myLock.getVersion() + ); } - if (!myLock.getInterval().equals(getInterval())) { - throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval()); - } - - // List unused segments - final List unusedSegments = toolbox - .getTaskActionClient() - .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval())); - - // Verify none of these segments have versions > lock version - for (final DataSegment unusedSegment : unusedSegments) { - if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) { - throw new ISE( - "WTF?! Unused segment[%s] has version[%s] > task version[%s]", - unusedSegment.getIdentifier(), - unusedSegment.getVersion(), - myLock.getVersion() - ); - } - - log.info("OK to restore segment: %s", unusedSegment.getIdentifier()); - } - - List restoredSegments = Lists.newLinkedList(); - - // Move segments - for (DataSegment segment : unusedSegments) { - restoredSegments.add(toolbox.getDataSegmentArchiver().restore(segment)); + log.info("OK to restore segment: %s", unusedSegment.getIdentifier()); + } + + final List restoredSegments = new ArrayList<>(); + + // Move segments + for (DataSegment segment : unusedSegments) { + final DataSegment restored = toolbox.getDataSegmentArchiver().restore(segment); + if (restored != null) { + restoredSegments.add(restored); + } else { + log.info("Segment [%s] did not move, not updating metadata", segment); } + } + if (restoredSegments.isEmpty()) { + log.info("No segments restored"); + } else { // Update metadata for moved segments toolbox.getTaskActionClient().submit( new SegmentMetadataUpdateAction( ImmutableSet.copyOf(restoredSegments) ) ); - - return TaskStatus.success(getId()); } + + return TaskStatus.success(getId()); + } }