mirror of https://github.com/apache/druid.git
Self-checking S3DataSegmentMover.safeMove() (#4725)
* Self-checking S3DataSegmentMover.safeMove() * Remove unused in S3DataSegmentMoverTest * Address comments * More specific excpetions * Remove delete check
This commit is contained in:
parent
267f415dc3
commit
cd5de123bd
|
@ -24,9 +24,10 @@ import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import io.druid.java.util.common.IOE;
|
||||||
import io.druid.java.util.common.ISE;
|
import io.druid.java.util.common.ISE;
|
||||||
import io.druid.java.util.common.MapUtils;
|
import io.druid.java.util.common.MapUtils;
|
||||||
|
import io.druid.java.util.common.RetryUtils;
|
||||||
import io.druid.java.util.common.StringUtils;
|
import io.druid.java.util.common.StringUtils;
|
||||||
import io.druid.java.util.common.logger.Logger;
|
import io.druid.java.util.common.logger.Logger;
|
||||||
import io.druid.segment.loading.DataSegmentMover;
|
import io.druid.segment.loading.DataSegmentMover;
|
||||||
|
@ -38,6 +39,7 @@ import org.jets3t.service.acl.gs.GSAccessControlList;
|
||||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||||
import org.jets3t.service.model.S3Object;
|
import org.jets3t.service.model.S3Object;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
|
@ -116,74 +118,22 @@ public class S3DataSegmentMover implements DataSegmentMover
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
S3Utils.retryS3Operation(
|
S3Utils.retryS3Operation(
|
||||||
new Callable<Void>()
|
(Callable<Void>) () -> {
|
||||||
{
|
final String copyMsg = StringUtils.format(
|
||||||
@Override
|
"[s3://%s/%s] to [s3://%s/%s]",
|
||||||
public Void call() throws Exception
|
s3Bucket,
|
||||||
{
|
s3Path,
|
||||||
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
|
targetS3Bucket,
|
||||||
log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
|
targetS3Path
|
||||||
return null;
|
);
|
||||||
}
|
try {
|
||||||
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
|
selfCheckingMove(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg);
|
||||||
final S3Object[] list = s3Client.listObjects(s3Bucket, s3Path, "");
|
|
||||||
if (list.length == 0) {
|
|
||||||
// should never happen
|
|
||||||
throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path);
|
|
||||||
}
|
|
||||||
final S3Object s3Object = list[0];
|
|
||||||
if (s3Object.getStorageClass() != null &&
|
|
||||||
s3Object.getStorageClass().equals(S3Object.STORAGE_CLASS_GLACIER)) {
|
|
||||||
log.warn("Cannot move file[s3://%s/%s] of storage class glacier, skipping.", s3Bucket, s3Path);
|
|
||||||
} else {
|
|
||||||
final String copyMsg = StringUtils.format(
|
|
||||||
"[s3://%s/%s] to [s3://%s/%s]", s3Bucket,
|
|
||||||
s3Path,
|
|
||||||
targetS3Bucket,
|
|
||||||
targetS3Path
|
|
||||||
);
|
|
||||||
log.info(
|
|
||||||
"Moving file %s",
|
|
||||||
copyMsg
|
|
||||||
);
|
|
||||||
final S3Object target = new S3Object(targetS3Path);
|
|
||||||
if (!config.getDisableAcl()) {
|
|
||||||
target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
|
|
||||||
}
|
|
||||||
final Map<String, Object> copyResult = s3Client.moveObject(
|
|
||||||
s3Bucket,
|
|
||||||
s3Path,
|
|
||||||
targetS3Bucket,
|
|
||||||
target,
|
|
||||||
false
|
|
||||||
);
|
|
||||||
if (copyResult != null && copyResult.containsKey("DeleteException")) {
|
|
||||||
log.error("Error Deleting data after copy %s: %s", copyMsg, copyResult);
|
|
||||||
// Maybe retry deleting here?
|
|
||||||
} else {
|
|
||||||
log.debug("Finished moving file %s", copyMsg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// ensure object exists in target location
|
|
||||||
if (s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) {
|
|
||||||
log.info(
|
|
||||||
"Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]",
|
|
||||||
s3Bucket, s3Path,
|
|
||||||
targetS3Bucket, targetS3Path
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
throw new SegmentLoadingException(
|
|
||||||
"Unable to move file [s3://%s/%s] to [s3://%s/%s], not present in either source or target location",
|
|
||||||
s3Bucket,
|
|
||||||
s3Path,
|
|
||||||
targetS3Bucket,
|
|
||||||
targetS3Path
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
catch (ServiceException | IOException | SegmentLoadingException e) {
|
||||||
|
log.info(e, "Error while trying to move " + copyMsg);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -193,4 +143,103 @@ public class S3DataSegmentMover implements DataSegmentMover
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copies an object and after that checks that the object is present at the target location, via a separate API call.
|
||||||
|
* If it is not, an exception is thrown, and the object is not deleted at the old location. This "paranoic" check
|
||||||
|
* is added after it was observed that S3 may report a successful move, and the object is not found at the target
|
||||||
|
* location.
|
||||||
|
*/
|
||||||
|
private void selfCheckingMove(
|
||||||
|
String s3Bucket,
|
||||||
|
String targetS3Bucket,
|
||||||
|
String s3Path,
|
||||||
|
String targetS3Path,
|
||||||
|
String copyMsg
|
||||||
|
) throws ServiceException, IOException, SegmentLoadingException
|
||||||
|
{
|
||||||
|
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
|
||||||
|
log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
|
||||||
|
final S3Object[] list = s3Client.listObjects(s3Bucket, s3Path, "");
|
||||||
|
if (list.length == 0) {
|
||||||
|
// should never happen
|
||||||
|
throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path);
|
||||||
|
}
|
||||||
|
final S3Object s3Object = list[0];
|
||||||
|
if (s3Object.getStorageClass() != null &&
|
||||||
|
s3Object.getStorageClass().equals(S3Object.STORAGE_CLASS_GLACIER)) {
|
||||||
|
throw new ServiceException(StringUtils.format(
|
||||||
|
"Cannot move file[s3://%s/%s] of storage class glacier, skipping.",
|
||||||
|
s3Bucket,
|
||||||
|
s3Path
|
||||||
|
));
|
||||||
|
} else {
|
||||||
|
log.info("Moving file %s", copyMsg);
|
||||||
|
final S3Object target = new S3Object(targetS3Path);
|
||||||
|
if (!config.getDisableAcl()) {
|
||||||
|
target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
|
||||||
|
}
|
||||||
|
s3Client.copyObject(
|
||||||
|
s3Bucket,
|
||||||
|
s3Path,
|
||||||
|
targetS3Bucket,
|
||||||
|
target,
|
||||||
|
false
|
||||||
|
);
|
||||||
|
if (!s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) {
|
||||||
|
throw new IOE(
|
||||||
|
"After copy was reported as successful the file doesn't exist in the target location [%s]",
|
||||||
|
copyMsg
|
||||||
|
);
|
||||||
|
}
|
||||||
|
deleteWithRetriesSilent(s3Bucket, s3Path);
|
||||||
|
log.debug("Finished moving file %s", copyMsg);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// ensure object exists in target location
|
||||||
|
if (s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) {
|
||||||
|
log.info(
|
||||||
|
"Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]",
|
||||||
|
s3Bucket, s3Path,
|
||||||
|
targetS3Bucket, targetS3Path
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
throw new SegmentLoadingException(
|
||||||
|
"Unable to move file %s, not present in either source or target location",
|
||||||
|
copyMsg
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteWithRetriesSilent(final String s3Bucket, final String s3Path)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
deleteWithRetries(s3Bucket, s3Path);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.error(e, "Failed to delete file [s3://%s/%s], giving up", s3Bucket, s3Path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteWithRetries(final String s3Bucket, final String s3Path) throws Exception
|
||||||
|
{
|
||||||
|
RetryUtils.retry(
|
||||||
|
(Callable<Void>) () -> {
|
||||||
|
try {
|
||||||
|
s3Client.deleteObject(s3Bucket, s3Path);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.info(e, "Error while trying to delete [s3://%s/%s]", s3Bucket, s3Path);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
S3Utils.S3RETRY,
|
||||||
|
3
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -160,7 +160,8 @@ public class S3DataSegmentMoverTest
|
||||||
private static class MockStorageService extends RestS3Service
|
private static class MockStorageService extends RestS3Service
|
||||||
{
|
{
|
||||||
Map<String, Set<String>> storage = Maps.newHashMap();
|
Map<String, Set<String>> storage = Maps.newHashMap();
|
||||||
boolean moved = false;
|
boolean copied = false;
|
||||||
|
boolean deletedOld = false;
|
||||||
|
|
||||||
private MockStorageService() throws S3ServiceException
|
private MockStorageService() throws S3ServiceException
|
||||||
{
|
{
|
||||||
|
@ -169,7 +170,7 @@ public class S3DataSegmentMoverTest
|
||||||
|
|
||||||
public boolean didMove()
|
public boolean didMove()
|
||||||
{
|
{
|
||||||
return moved;
|
return copied && deletedOld;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -196,7 +197,7 @@ public class S3DataSegmentMoverTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Object> moveObject(
|
public Map<String, Object> copyObject(
|
||||||
String sourceBucketName,
|
String sourceBucketName,
|
||||||
String sourceObjectKey,
|
String sourceObjectKey,
|
||||||
String destinationBucketName,
|
String destinationBucketName,
|
||||||
|
@ -204,19 +205,25 @@ public class S3DataSegmentMoverTest
|
||||||
boolean replaceMetadata
|
boolean replaceMetadata
|
||||||
) throws ServiceException
|
) throws ServiceException
|
||||||
{
|
{
|
||||||
moved = true;
|
copied = true;
|
||||||
if (isObjectInBucket(sourceBucketName, sourceObjectKey)) {
|
if (isObjectInBucket(sourceBucketName, sourceObjectKey)) {
|
||||||
this.putObject(destinationBucketName, new S3Object(destinationObject.getKey()));
|
this.putObject(destinationBucketName, new S3Object(destinationObject.getKey()));
|
||||||
storage.get(sourceBucketName).remove(sourceObjectKey);
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteObject(String bucket, String objectKey) throws S3ServiceException
|
||||||
|
{
|
||||||
|
deletedOld = true;
|
||||||
|
storage.get(bucket).remove(objectKey);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public S3Object putObject(String bucketName, S3Object object) throws S3ServiceException
|
public S3Object putObject(String bucketName, S3Object object) throws S3ServiceException
|
||||||
{
|
{
|
||||||
if (!storage.containsKey(bucketName)) {
|
if (!storage.containsKey(bucketName)) {
|
||||||
storage.put(bucketName, Sets.<String>newHashSet());
|
storage.put(bucketName, Sets.newHashSet());
|
||||||
}
|
}
|
||||||
storage.get(bucketName).add(object.getKey());
|
storage.get(bucketName).add(object.getKey());
|
||||||
return object;
|
return object;
|
||||||
|
|
Loading…
Reference in New Issue