From f3b8d9c047fc6a6dc6625119d1bbd088279136d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 16 Dec 2013 13:54:30 -0800 Subject: [PATCH] safely move files --- .../druid/storage/s3/S3DataSegmentMover.java | 53 ++++++++++++------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java index 01558b5c44d..fe8251a0cc0 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -69,26 +69,8 @@ public class S3DataSegmentMover implements DataSegmentMover throw new SegmentLoadingException("Target S3 baseKey is not specified"); } - if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { - log.info( - "Moving index file[s3://%s/%s] to [s3://%s/%s]", - s3Bucket, - s3Path, - targetS3Bucket, - targetS3Path - ); - s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, new S3Object(targetS3Path), false); - } - if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { - log.info( - "Moving descriptor file[s3://%s/%s] to [s3://%s/%s]", - s3Bucket, - s3DescriptorPath, - targetS3Bucket, - targetS3DescriptorPath - ); - s3Client.moveObject(s3Bucket, s3DescriptorPath, targetS3Bucket, new S3Object(targetS3DescriptorPath), false); - } + safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path); + safeMove(s3Bucket, s3DescriptorPath, targetS3Bucket, targetS3DescriptorPath); return segment.withLoadSpec( ImmutableMap.builder() @@ -102,4 +84,35 @@ public class S3DataSegmentMover implements DataSegmentMover throw new SegmentLoadingException(e, "Unable to move segment[%s]", segment.getIdentifier()); } } + + private void safeMove(String s3Bucket, String s3Path, String targetS3Bucket, String targetS3Path) + throws ServiceException, SegmentLoadingException + { + if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { + log.info( + "Moving file[s3://%s/%s] to [s3://%s/%s]", + s3Bucket, + s3Path, + targetS3Bucket, + targetS3Path + ); + s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, new S3Object(targetS3Path), false); + } else { + // ensure object exists in target location + if(s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) { + log.info( + "Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]", + s3Bucket, s3Path, + targetS3Bucket, targetS3Path + ); + } + else { + throw new SegmentLoadingException( + "Unable to move file [s3://%s/%s] to [s3://%s/%s], not present in either source or target location", + s3Bucket, s3Path, + targetS3Bucket, targetS3Path + ); + } + } + } }