diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java index cffe2be30f6..983837865e8 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -24,9 +24,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.inject.Inject; - +import io.druid.java.util.common.IOE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.MapUtils; +import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.DataSegmentMover; @@ -38,6 +39,7 @@ import org.jets3t.service.acl.gs.GSAccessControlList; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Object; +import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; @@ -116,74 +118,22 @@ public class S3DataSegmentMover implements DataSegmentMover { try { S3Utils.retryS3Operation( - new Callable() - { - @Override - public Void call() throws Exception - { - if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) { - log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path); - return null; - } - if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { - final S3Object[] list = s3Client.listObjects(s3Bucket, s3Path, ""); - if (list.length == 0) { - // should never happen - throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path); - } - final S3Object s3Object = list[0]; - if (s3Object.getStorageClass() != null && - s3Object.getStorageClass().equals(S3Object.STORAGE_CLASS_GLACIER)) { - log.warn("Cannot move file[s3://%s/%s] of storage class glacier, skipping.", s3Bucket, s3Path); - } else { - final String copyMsg = StringUtils.format( - "[s3://%s/%s] to [s3://%s/%s]", s3Bucket, - s3Path, - targetS3Bucket, - targetS3Path - ); - log.info( - "Moving file %s", - copyMsg - ); - final S3Object target = new S3Object(targetS3Path); - if (!config.getDisableAcl()) { - target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); - } - final Map copyResult = s3Client.moveObject( - s3Bucket, - s3Path, - targetS3Bucket, - target, - false - ); - if (copyResult != null && copyResult.containsKey("DeleteException")) { - log.error("Error Deleting data after copy %s: %s", copyMsg, copyResult); - // Maybe retry deleting here? - } else { - log.debug("Finished moving file %s", copyMsg); - } - } - } else { - // ensure object exists in target location - if (s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) { - log.info( - "Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]", - s3Bucket, s3Path, - targetS3Bucket, targetS3Path - ); - } else { - throw new SegmentLoadingException( - "Unable to move file [s3://%s/%s] to [s3://%s/%s], not present in either source or target location", - s3Bucket, - s3Path, - targetS3Bucket, - targetS3Path - ); - } - } + (Callable) () -> { + final String copyMsg = StringUtils.format( + "[s3://%s/%s] to [s3://%s/%s]", + s3Bucket, + s3Path, + targetS3Bucket, + targetS3Path + ); + try { + selfCheckingMove(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg); return null; } + catch (ServiceException | IOException | SegmentLoadingException e) { + log.info(e, "Error while trying to move " + copyMsg); + throw e; + } } ); } @@ -193,4 +143,103 @@ public class S3DataSegmentMover implements DataSegmentMover throw Throwables.propagate(e); } } + + /** + * Copies an object and after that checks that the object is present at the target location, via a separate API call. + * If it is not, an exception is thrown, and the object is not deleted at the old location. This "paranoic" check + * is added after it was observed that S3 may report a successful move, and the object is not found at the target + * location. + */ + private void selfCheckingMove( + String s3Bucket, + String targetS3Bucket, + String s3Path, + String targetS3Path, + String copyMsg + ) throws ServiceException, IOException, SegmentLoadingException + { + if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) { + log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path); + return; + } + if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { + final S3Object[] list = s3Client.listObjects(s3Bucket, s3Path, ""); + if (list.length == 0) { + // should never happen + throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path); + } + final S3Object s3Object = list[0]; + if (s3Object.getStorageClass() != null && + s3Object.getStorageClass().equals(S3Object.STORAGE_CLASS_GLACIER)) { + throw new ServiceException(StringUtils.format( + "Cannot move file[s3://%s/%s] of storage class glacier, skipping.", + s3Bucket, + s3Path + )); + } else { + log.info("Moving file %s", copyMsg); + final S3Object target = new S3Object(targetS3Path); + if (!config.getDisableAcl()) { + target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); + } + s3Client.copyObject( + s3Bucket, + s3Path, + targetS3Bucket, + target, + false + ); + if (!s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) { + throw new IOE( + "After copy was reported as successful the file doesn't exist in the target location [%s]", + copyMsg + ); + } + deleteWithRetriesSilent(s3Bucket, s3Path); + log.debug("Finished moving file %s", copyMsg); + } + } else { + // ensure object exists in target location + if (s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) { + log.info( + "Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]", + s3Bucket, s3Path, + targetS3Bucket, targetS3Path + ); + } else { + throw new SegmentLoadingException( + "Unable to move file %s, not present in either source or target location", + copyMsg + ); + } + } + } + + private void deleteWithRetriesSilent(final String s3Bucket, final String s3Path) + { + try { + deleteWithRetries(s3Bucket, s3Path); + } + catch (Exception e) { + log.error(e, "Failed to delete file [s3://%s/%s], giving up", s3Bucket, s3Path); + } + } + + private void deleteWithRetries(final String s3Bucket, final String s3Path) throws Exception + { + RetryUtils.retry( + (Callable) () -> { + try { + s3Client.deleteObject(s3Bucket, s3Path); + return null; + } + catch (Exception e) { + log.info(e, "Error while trying to delete [s3://%s/%s]", s3Bucket, s3Path); + throw e; + } + }, + S3Utils.S3RETRY, + 3 + ); + } } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java index fbd676f700b..220cf93da77 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java @@ -160,7 +160,8 @@ public class S3DataSegmentMoverTest private static class MockStorageService extends RestS3Service { Map> storage = Maps.newHashMap(); - boolean moved = false; + boolean copied = false; + boolean deletedOld = false; private MockStorageService() throws S3ServiceException { @@ -169,7 +170,7 @@ public class S3DataSegmentMoverTest public boolean didMove() { - return moved; + return copied && deletedOld; } @Override @@ -196,7 +197,7 @@ public class S3DataSegmentMoverTest } @Override - public Map moveObject( + public Map copyObject( String sourceBucketName, String sourceObjectKey, String destinationBucketName, @@ -204,19 +205,25 @@ public class S3DataSegmentMoverTest boolean replaceMetadata ) throws ServiceException { - moved = true; + copied = true; if (isObjectInBucket(sourceBucketName, sourceObjectKey)) { this.putObject(destinationBucketName, new S3Object(destinationObject.getKey())); - storage.get(sourceBucketName).remove(sourceObjectKey); } return null; } + @Override + public void deleteObject(String bucket, String objectKey) throws S3ServiceException + { + deletedOld = true; + storage.get(bucket).remove(objectKey); + } + @Override public S3Object putObject(String bucketName, S3Object object) throws S3ServiceException { if (!storage.containsKey(bucketName)) { - storage.put(bucketName, Sets.newHashSet()); + storage.put(bucketName, Sets.newHashSet()); } storage.get(bucketName).add(object.getKey()); return object;