mirror of https://github.com/apache/druid.git
Make S3DataSegmentMover not bother checking for items if they are the same (#3032)
* Make S3DataSegmentMover not bother checking for items if they are the same
This commit is contained in:
parent
d552a5c034
commit
447033985e
|
@ -82,21 +82,21 @@ public class S3DataSegmentMover implements DataSegmentMover
|
||||||
|
|
||||||
return segment.withLoadSpec(
|
return segment.withLoadSpec(
|
||||||
ImmutableMap.<String, Object>builder()
|
ImmutableMap.<String, Object>builder()
|
||||||
.putAll(
|
.putAll(
|
||||||
Maps.filterKeys(
|
Maps.filterKeys(
|
||||||
loadSpec, new Predicate<String>()
|
loadSpec, new Predicate<String>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(String input)
|
public boolean apply(String input)
|
||||||
{
|
{
|
||||||
return !(input.equals("bucket") || input.equals("key"));
|
return !(input.equals("bucket") || input.equals("key"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
.put("bucket", targetS3Bucket)
|
.put("bucket", targetS3Bucket)
|
||||||
.put("key", targetS3Path)
|
.put("key", targetS3Path)
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (ServiceException e) {
|
catch (ServiceException e) {
|
||||||
|
@ -118,33 +118,33 @@ public class S3DataSegmentMover implements DataSegmentMover
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception
|
public Void call() throws Exception
|
||||||
{
|
{
|
||||||
|
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
|
||||||
|
log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
|
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
|
||||||
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
|
final S3Object[] list = s3Client.listObjects(s3Bucket, s3Path, "");
|
||||||
log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
|
if (list.length == 0) {
|
||||||
|
// should never happen
|
||||||
|
throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path);
|
||||||
|
}
|
||||||
|
final S3Object s3Object = list[0];
|
||||||
|
if (s3Object.getStorageClass() != null &&
|
||||||
|
s3Object.getStorageClass().equals(S3Object.STORAGE_CLASS_GLACIER)) {
|
||||||
|
log.warn("Cannot move file[s3://%s/%s] of storage class glacier, skipping.", s3Bucket, s3Path);
|
||||||
} else {
|
} else {
|
||||||
final S3Object[] list = s3Client.listObjects(s3Bucket, s3Path, "");
|
log.info(
|
||||||
if (list.length == 0) {
|
"Moving file[s3://%s/%s] to [s3://%s/%s]",
|
||||||
// should never happen
|
s3Bucket,
|
||||||
throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path);
|
s3Path,
|
||||||
}
|
targetS3Bucket,
|
||||||
final S3Object s3Object = list[0];
|
targetS3Path
|
||||||
if (s3Object.getStorageClass() != null &&
|
);
|
||||||
s3Object.getStorageClass().equals(S3Object.STORAGE_CLASS_GLACIER)) {
|
final S3Object target = new S3Object(targetS3Path);
|
||||||
log.warn("Cannot move file[s3://%s/%s] of storage class glacier, skipping.", s3Bucket, s3Path);
|
if (!config.getDisableAcl()) {
|
||||||
} else {
|
target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
|
||||||
log.info(
|
|
||||||
"Moving file[s3://%s/%s] to [s3://%s/%s]",
|
|
||||||
s3Bucket,
|
|
||||||
s3Path,
|
|
||||||
targetS3Bucket,
|
|
||||||
targetS3Path
|
|
||||||
);
|
|
||||||
final S3Object target = new S3Object(targetS3Path);
|
|
||||||
if (!config.getDisableAcl()) {
|
|
||||||
target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
|
|
||||||
}
|
|
||||||
s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, target, false);
|
|
||||||
}
|
}
|
||||||
|
s3Client.moveObject(s3Bucket, s3Path, targetS3Bucket, target, false);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// ensure object exists in target location
|
// ensure object exists in target location
|
||||||
|
@ -157,8 +157,10 @@ public class S3DataSegmentMover implements DataSegmentMover
|
||||||
} else {
|
} else {
|
||||||
throw new SegmentLoadingException(
|
throw new SegmentLoadingException(
|
||||||
"Unable to move file [s3://%s/%s] to [s3://%s/%s], not present in either source or target location",
|
"Unable to move file [s3://%s/%s] to [s3://%s/%s], not present in either source or target location",
|
||||||
s3Bucket, s3Path,
|
s3Bucket,
|
||||||
targetS3Bucket, targetS3Path
|
s3Path,
|
||||||
|
targetS3Bucket,
|
||||||
|
targetS3Path
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,6 +111,52 @@ public class S3DataSegmentMoverTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIgnoresGoneButAlreadyMoved() throws Exception
|
||||||
|
{
|
||||||
|
MockStorageService mockS3Client = new MockStorageService();
|
||||||
|
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig());
|
||||||
|
mover.move(new DataSegment(
|
||||||
|
"test",
|
||||||
|
new Interval("2013-01-01/2013-01-02"),
|
||||||
|
"1",
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"key",
|
||||||
|
"baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip",
|
||||||
|
"bucket",
|
||||||
|
"DOES NOT EXIST"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim1"),
|
||||||
|
ImmutableList.of("metric1", "metric2"),
|
||||||
|
new NoneShardSpec(),
|
||||||
|
0,
|
||||||
|
1
|
||||||
|
), ImmutableMap.<String, Object>of("bucket", "DOES NOT EXIST", "baseKey", "baseKey"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = SegmentLoadingException.class)
|
||||||
|
public void testFailsToMoveMissing() throws Exception
|
||||||
|
{
|
||||||
|
MockStorageService mockS3Client = new MockStorageService();
|
||||||
|
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig());
|
||||||
|
mover.move(new DataSegment(
|
||||||
|
"test",
|
||||||
|
new Interval("2013-01-01/2013-01-02"),
|
||||||
|
"1",
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
"key",
|
||||||
|
"baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip",
|
||||||
|
"bucket",
|
||||||
|
"DOES NOT EXIST"
|
||||||
|
),
|
||||||
|
ImmutableList.of("dim1", "dim1"),
|
||||||
|
ImmutableList.of("metric1", "metric2"),
|
||||||
|
new NoneShardSpec(),
|
||||||
|
0,
|
||||||
|
1
|
||||||
|
), ImmutableMap.<String, Object>of("bucket", "DOES NOT EXIST", "baseKey", "baseKey2"));
|
||||||
|
}
|
||||||
|
|
||||||
private class MockStorageService extends RestS3Service {
|
private class MockStorageService extends RestS3Service {
|
||||||
Map<String, Set<String>> storage = Maps.newHashMap();
|
Map<String, Set<String>> storage = Maps.newHashMap();
|
||||||
boolean moved = false;
|
boolean moved = false;
|
||||||
|
|
Loading…
Reference in New Issue