Support replaceExisting parameter for segments pushers (#5187)

* support replaceExisting parameter for segments pushers

* code review changes

* code review changes
This commit is contained in:
David Lim 2018-01-03 17:13:21 -07:00 committed by Gian Merlino
parent 59af4d3b14
commit a7967ade4d
32 changed files with 382 additions and 172 deletions

View File

@ -39,7 +39,30 @@ public interface DataSegmentPusher
@Deprecated
String getPathForHadoop(String dataSource);
String getPathForHadoop();
DataSegment push(File file, DataSegment segment) throws IOException;
/**
* Pushes index files and segment descriptor to deep storage.
* @param file directory containing index files
* @param segment segment descriptor
* @param replaceExisting overwrites existing objects if true, else leaves existing objects unchanged on conflict.
* The behavior of the indexer determines whether this should be true or false. For example,
* since Tranquility does not guarantee that replica tasks will generate indexes with the same
* data, the first segment pushed should be favored since otherwise multiple historicals may
* load segments with the same identifier but different contents which is a bad situation. On
* the other hand, indexers that maintain exactly-once semantics by storing checkpoint data can
* lose or repeat data if it fails to write a segment because it already exists and overwriting
* is not permitted. This situation can occur if a task fails after pushing to deep storage but
* before writing to the metadata storage, see: https://github.com/druid-io/druid/issues/5161.
*
* If replaceExisting is true, existing objects MUST be overwritten, since failure to do so
* will break exactly-once semantics. If replaceExisting is false, existing objects SHOULD be
* prioritized but it is acceptable if they are overwritten (deep storages may be eventually
* consistent or otherwise unable to support transactional writes).
* @return segment descriptor
* @throws IOException
*/
DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException;
//use map instead of LoadSpec class to avoid dependency pollution.
Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath);

View File

@ -109,12 +109,13 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final long size,
final File compressedSegmentData,
final File descriptorFile,
final Map<String, String> azurePaths
final Map<String, String> azurePaths,
final boolean replaceExisting
)
throws StorageException, IOException, URISyntaxException
{
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"));
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"));
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"), replaceExisting);
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"), replaceExisting);
final DataSegment outSegment = segment
.withSize(size)
@ -131,9 +132,9 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(final File indexFilesDir, final DataSegment segment) throws IOException
public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting)
throws IOException
{
log.info("Uploading [%s] to Azure.", indexFilesDir);
final int version = SegmentUtils.getVersionFromDir(indexFilesDir);
@ -153,7 +154,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
@Override
public DataSegment call() throws Exception
{
return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths);
return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths, replaceExisting);
}
},
config.getMaxTries()

View File

@ -25,7 +25,6 @@ import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import io.druid.java.util.common.logger.Logger;
import java.io.File;
@ -81,14 +80,24 @@ public class AzureStorage
}
public void uploadBlob(final File file, final String containerName, final String blobPath)
public void uploadBlob(
final File file,
final String containerName,
final String blobPath,
final boolean replaceExisting
)
throws IOException, StorageException, URISyntaxException
{
CloudBlobContainer container = getCloudBlobContainer(containerName);
try (FileInputStream stream = new FileInputStream(file)) {
CloudBlockBlob blob = container.getBlockBlobReference(blobPath);
blob.upload(stream, file.length());
if (!replaceExisting && blob.exists()) {
log.info("Skipping push because blob [%s] exists && replaceExisting == false", blobPath);
} else {
blob.upload(stream, file.length());
}
}
}

View File

@ -59,7 +59,7 @@ public class AzureTaskLogs implements TaskLogs
try {
AzureUtils.retryAzureOperation(
(Callable<Void>) () -> {
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey);
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey, true);
return null;
},
config.getMaxTries()

View File

@ -104,7 +104,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
size
);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
}
@ -133,9 +133,9 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
final File descriptorFile = new File("descriptor.json");
final Map<String, String> azurePaths = pusher.getAzurePaths(dataSegment);
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"));
azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"), true);
expectLastCall();
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"));
azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"), true);
expectLastCall();
replayAll();
@ -146,7 +146,8 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
0, // empty file
compressedSegmentData,
descriptorFile,
azurePaths
azurePaths,
true
);
assertEquals(compressedSegmentData.length(), pushedDataSegment.getSize());

View File

@ -65,7 +65,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
try {
final File logFile = new File(tmpDir, "log");
azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log");
azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log", true);
expectLastCall();
replayAll();

View File

@ -24,8 +24,9 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.recipes.storage.ChunkedStorage;
import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils;
@ -53,7 +54,8 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
@Inject
public CassandraDataSegmentPusher(
CassandraDataSegmentConfig config,
ObjectMapper jsonMapper)
ObjectMapper jsonMapper
)
{
super(config);
this.jsonMapper = jsonMapper;
@ -73,13 +75,14 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
}
@Override
public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException
public DataSegment push(final File indexFilesDir, DataSegment segment, final boolean replaceExisting)
throws IOException
{
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
this.getStorageDir(segment)
);
);
// Create index
final File compressedIndexFile = File.createTempFile("druid", "index.zip");
@ -89,26 +92,28 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
int version = SegmentUtils.getVersionFromDir(indexFilesDir);
try {
long start = System.currentTimeMillis();
ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
.withConcurrencyLevel(CONCURRENCY).call();
byte[] json = jsonMapper.writeValueAsBytes(segment);
MutationBatch mutation = this.keyspace.prepareMutationBatch();
mutation.withRow(descriptorStorage, key)
.putColumn("lastmodified", System.currentTimeMillis(), null)
.putColumn("descriptor", json, null);
mutation.execute();
log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start);
if (!replaceExisting && doesObjectExist(indexStorage, key)) {
log.info("Skipping push because key [%s] exists && replaceExisting == false", key);
} else {
long start = System.currentTimeMillis();
ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile))
.withConcurrencyLevel(CONCURRENCY).call();
byte[] json = jsonMapper.writeValueAsBytes(segment);
MutationBatch mutation = this.keyspace.prepareMutationBatch();
mutation.withRow(descriptorStorage, key)
.putColumn("lastmodified", System.currentTimeMillis(), null)
.putColumn("descriptor", json, null);
mutation.execute();
log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start);
}
}
catch (Exception e) {
throw new IOException(e);
}
segment = segment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object> of("type", "c*", "key", key)
)
.withBinaryVersion(version);
.withLoadSpec(ImmutableMap.<String, Object>of("type", "c*", "key", key))
.withBinaryVersion(version);
log.info("Deleting zipped index File[%s]", compressedIndexFile);
compressedIndexFile.delete();
@ -120,4 +125,14 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
{
throw new UnsupportedOperationException("not supported");
}
private boolean doesObjectExist(ChunkedStorageProvider provider, String objectName) throws Exception
{
try {
return ChunkedStorage.newInfoReader(provider, objectName).call().isValidForRead();
}
catch (NotFoundException e) {
return false;
}
}
}

View File

@ -74,7 +74,8 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean replaceExisting)
throws IOException
{
final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), getStorageDir(inSegment));
@ -98,18 +99,23 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
segmentPath, outFile, objectApi.getRegion(),
objectApi.getContainer()
);
log.info("Pushing %s.", segmentData.getPath());
objectApi.put(segmentData);
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment));
CloudFilesObject descriptorData = new CloudFilesObject(
segmentPath, descFile,
objectApi.getRegion(), objectApi.getContainer()
);
log.info("Pushing %s.", descriptorData.getPath());
objectApi.put(descriptorData);
if (!replaceExisting && objectApi.exists(segmentData.getPath())) {
log.info("Skipping push because object [%s] exists && replaceExisting == false", segmentData.getPath());
} else {
log.info("Pushing %s.", segmentData.getPath());
objectApi.put(segmentData);
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment));
CloudFilesObject descriptorData = new CloudFilesObject(
segmentPath, descFile,
objectApi.getRegion(), objectApi.getContainer()
);
log.info("Pushing %s.", descriptorData.getPath());
objectApi.put(descriptorData);
}
final DataSegment outSegment = inSegment
.withSize(indexSize)

View File

@ -58,4 +58,9 @@ public class CloudFilesObjectApiProxy
Payload payload = swiftObject.getPayload();
return new CloudFilesObject(payload, this.region, this.container, path);
}
public boolean exists(String path)
{
return objectApi.getWithoutBody(path) != null;
}
}

View File

@ -84,7 +84,7 @@ public class CloudFilesDataSegmentPusherTest
size
);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());

View File

@ -93,7 +93,8 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
return descriptorFile;
}
public void insert(final File file, final String contentType, final String path) throws IOException
public void insert(final File file, final String contentType, final String path, final boolean replaceExisting)
throws IOException
{
LOG.info("Inserting [%s] to [%s]", file, path);
@ -102,11 +103,16 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
InputStreamContent mediaContent = new InputStreamContent(contentType, fileSteam);
mediaContent.setLength(file.length());
storage.insert(config.getBucket(), path, mediaContent);
if (!replaceExisting && storage.exists(config.getBucket(), path)) {
LOG.info("Skipping push because path [%s] exists && replaceExisting == false", path);
} else {
storage.insert(config.getBucket(), path, mediaContent);
}
}
@Override
public DataSegment push(final File indexFilesDir, final DataSegment segment) throws IOException
public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting)
throws IOException
{
LOG.info("Uploading [%s] to Google.", indexFilesDir);
@ -128,8 +134,8 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
descriptorFile = createDescriptorFile(jsonMapper, outSegment);
insert(indexFile, "application/zip", indexPath);
insert(descriptorFile, "application/json", descriptorPath);
insert(indexFile, "application/zip", indexPath, replaceExisting);
insert(descriptorFile, "application/json", descriptorPath, replaceExisting);
return outSegment;
}

View File

@ -103,20 +103,30 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
storage,
googleAccountConfig,
jsonMapper
).addMockedMethod("insert", File.class, String.class, String.class).createMock();
).addMockedMethod("insert", File.class, String.class, String.class, boolean.class).createMock();
final String storageDir = pusher.getStorageDir(segmentToPush);
final String indexPath = prefix + "/" + storageDir + "/" + "index.zip";
final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json";
pusher.insert(EasyMock.anyObject(File.class), EasyMock.eq("application/zip"), EasyMock.eq(indexPath));
pusher.insert(
EasyMock.anyObject(File.class),
EasyMock.eq("application/zip"),
EasyMock.eq(indexPath),
EasyMock.eq(true)
);
expectLastCall();
pusher.insert(EasyMock.anyObject(File.class), EasyMock.eq("application/json"), EasyMock.eq(descriptorPath));
pusher.insert(
EasyMock.anyObject(File.class),
EasyMock.eq("application/json"),
EasyMock.eq(descriptorPath),
EasyMock.eq(true)
);
expectLastCall();
replayAll();
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(segmentToPush, segment);

View File

@ -89,7 +89,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(File inDir, DataSegment segment) throws IOException
public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting) throws IOException
{
final String storageDir = this.getStorageDir(segment);
@ -145,8 +145,8 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
// Create parent if it does not exist, recreation is not an error
fs.mkdirs(outIndexFile.getParent());
copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile);
copyFilesWithChecks(fs, tmpIndexFile, outIndexFile);
copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile, replaceExisting);
copyFilesWithChecks(fs, tmpIndexFile, outIndexFile, replaceExisting);
}
finally {
try {
@ -162,9 +162,10 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
return dataSegment;
}
private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to) throws IOException
private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to, final boolean replaceExisting)
throws IOException
{
if (!HadoopFsWrapper.rename(fs, from, to)) {
if (!HadoopFsWrapper.rename(fs, from, to, replaceExisting)) {
if (fs.exists(to)) {
log.info(
"Unable to rename temp Index file[%s] to final segment path [%s]. "

View File

@ -36,23 +36,25 @@ public class HadoopFsWrapper
private HadoopFsWrapper() {}
/**
* Same as FileSystem.rename(from, to, Options.Rename.NONE) . That is,
* it returns "false" when "to" directory already exists. It is different from FileSystem.rename(from, to)
* which moves "from" directory inside "to" directory if it already exists.
* Same as FileSystem.rename(from, to, Options.Rename). It is different from FileSystem.rename(from, to) which moves
* "from" directory inside "to" directory if it already exists.
*
* @param from
* @param to
* @return
* @throws IOException
* @param replaceExisting if existing files should be overwritten
*
* @return true if operation succeeded, false if replaceExisting == false and destination already exists
*
* @throws IOException if trying to overwrite a non-empty directory
*/
public static boolean rename(FileSystem fs, Path from, Path to) throws IOException
public static boolean rename(FileSystem fs, Path from, Path to, boolean replaceExisting) throws IOException
{
try {
fs.rename(from, to, Options.Rename.NONE);
fs.rename(from, to, replaceExisting ? Options.Rename.OVERWRITE : Options.Rename.NONE);
return true;
}
catch (IOException ex) {
log.warn(ex, "Failed to rename [%s] to [%s].", from, to);
catch (FileAlreadyExistsException ex) {
log.info(ex, "Destination exists while renaming [%s] to [%s]", from, to);
return false;
}
}

View File

@ -161,7 +161,7 @@ public class HdfsDataSegmentPusherTest
size
);
DataSegment segment = pusher.push(segmentDir, segmentToPush);
DataSegment segment = pusher.push(segmentDir, segmentToPush, true);
String indexUri = StringUtils.format(
@ -201,7 +201,7 @@ public class HdfsDataSegmentPusherTest
File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath));
outDir.setReadOnly();
try {
pusher.push(segmentDir, segmentToPush);
pusher.push(segmentDir, segmentToPush, true);
}
catch (IOException e) {
Assert.fail("should not throw exception");
@ -246,7 +246,7 @@ public class HdfsDataSegmentPusherTest
}
for (int i = 0; i < numberOfSegments; i++) {
final DataSegment pushedSegment = pusher.push(segmentDir, segments[i]);
final DataSegment pushedSegment = pusher.push(segmentDir, segments[i], true);
String indexUri = StringUtils.format(
"%s/%s/%d_index.zip",
@ -308,7 +308,7 @@ public class HdfsDataSegmentPusherTest
File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath));
outDir.setReadOnly();
try {
pusher.push(segmentDir, segments[i]);
pusher.push(segmentDir, segments[i], true);
}
catch (IOException e) {
Assert.fail("should not throw exception");

View File

@ -88,7 +88,8 @@ public class S3DataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean replaceExisting)
throws IOException
{
final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment));
@ -105,21 +106,10 @@ public class S3DataSegmentPusher implements DataSegmentPusher
public DataSegment call() throws Exception
{
S3Object toPush = new S3Object(zipOutFile);
final String outputBucket = config.getBucket();
final String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path);
toPush.setBucketName(outputBucket);
toPush.setKey(s3Path);
if (!config.getDisableAcl()) {
toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
log.info("Pushing %s.", toPush);
s3Client.putObject(outputBucket, toPush);
putObject(config.getBucket(), s3Path, toPush, replaceExisting);
final DataSegment outSegment = inSegment.withSize(indexSize)
.withLoadSpec(makeLoadSpec(outputBucket, toPush.getKey()))
.withLoadSpec(makeLoadSpec(config.getBucket(), toPush.getKey()))
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
File descriptorFile = File.createTempFile("druid", "descriptor.json");
@ -127,14 +117,13 @@ public class S3DataSegmentPusher implements DataSegmentPusher
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment));
S3Object descriptorObject = new S3Object(descriptorFile);
descriptorObject.setBucketName(outputBucket);
descriptorObject.setKey(s3DescriptorPath);
if (!config.getDisableAcl()) {
descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
log.info("Pushing %s", descriptorObject);
s3Client.putObject(outputBucket, descriptorObject);
putObject(
config.getBucket(),
S3Utils.descriptorPathForSegmentPath(s3Path),
descriptorObject,
replaceExisting
);
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
@ -164,7 +153,6 @@ public class S3DataSegmentPusher implements DataSegmentPusher
/**
* Any change in loadSpec need to be reflected {@link io.druid.indexer.JobHelper#getURIFromSegment()}
*
*/
@SuppressWarnings("JavadocReference")
private Map<String, Object> makeLoadSpec(String bucket, String key)
@ -180,4 +168,22 @@ public class S3DataSegmentPusher implements DataSegmentPusher
config.isUseS3aSchema() ? "s3a" : "s3n"
);
}
private void putObject(String bucketName, String path, S3Object object, boolean replaceExisting)
throws ServiceException
{
object.setBucketName(bucketName);
object.setKey(path);
if (!config.getDisableAcl()) {
object.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
log.info("Pushing %s.", object);
if (!replaceExisting && S3Utils.isObjectInBucket(s3Client, bucketName, object.getKey())) {
log.info("Skipping push because key [%s] exists && replaceExisting == false", object.getKey());
} else {
s3Client.putObject(bucketName, object);
}
}
}

View File

@ -113,7 +113,7 @@ public class S3DataSegmentPusherTest
size
);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush);
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
Assert.assertEquals(1, (int) segment.getBinaryVersion());

View File

@ -199,7 +199,10 @@ public class YeOldePlumberSchool implements PlumberSchool
.withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions()))
.withBinaryVersion(SegmentUtils.getVersionFromDir(fileToUpload));
dataSegmentPusher.push(fileToUpload, segmentToUpload);
// This plumber is only used in batch ingestion situations where you do not have replica tasks pushing
// segments with the same identifier but potentially different contents. In case of conflict, favor the most
// recently pushed segment (replaceExisting == true).
dataSegmentPusher.push(fileToUpload, segmentToUpload, true);
log.info(
"Uploaded segment[%s]",

View File

@ -440,7 +440,11 @@ public class ConvertSegmentTask extends AbstractFixedIntervalTask
// Appending to the version makes a new version that inherits most comparability parameters of the original
// version, but is "newer" than said original version.
DataSegment updatedSegment = segment.withVersion(StringUtils.format("%s_v%s", segment.getVersion(), outVersion));
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
// The convert segment task does not support replicas where different tasks could generate segments with the
// same identifier but potentially different contents. In case of conflict, favor the most recently pushed
// segment (replaceExisting == true).
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment, true);
actionClient.submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
} else {

View File

@ -185,7 +185,11 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
long uploadStart = System.currentTimeMillis();
// Upload file
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment);
// The merge task does not support replicas where different tasks could generate segments with the
// same identifier but potentially different contents. In case of conflict, favor the most recently pushed
// segment (replaceExisting == true).
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment, true);
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));

View File

@ -1006,7 +1006,7 @@ public class IndexTaskTest
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException
{
segments.add(segment);
return segment;

View File

@ -201,7 +201,7 @@ public class SameIntervalMergeTaskTest
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException
{
// the merged segment is pushed to storage
segments.add(segment);

View File

@ -249,7 +249,7 @@ public class IngestSegmentFirehoseFactoryTest
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException
{
return segment;
}

View File

@ -485,7 +485,7 @@ public class TaskLifecycleTest
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException
{
pushedSegments++;
return segment;
@ -1034,7 +1034,7 @@ public class TaskLifecycleTest
}
@Override
public DataSegment push(File file, DataSegment dataSegment) throws IOException
public DataSegment push(File file, DataSegment dataSegment, boolean replaceExisting) throws IOException
{
throw new RuntimeException("FAILURE");
}

View File

@ -48,7 +48,7 @@ public class TestDataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException
{
pushedSegments.add(segment);
return segment;

View File

@ -48,9 +48,38 @@ public class CompressionUtils
{
private static final Logger log = new Logger(CompressionUtils.class);
private static final int DEFAULT_RETRY_COUNT = 3;
private static final String GZ_SUFFIX = ".gz";
private static final String ZIP_SUFFIX = ".zip";
public static final String GZ_SUFFIX = ".gz";
public static final String ZIP_SUFFIX = ".zip";
/**
* Zip the contents of directory into the file indicated by outputZipFile. Sub directories are skipped
*
* @param directory The directory whose contents should be added to the zip in the output stream.
* @param outputZipFile The output file to write the zipped data to
* @param fsync True if the output file should be fsynced to disk
*
* @return The number of bytes (uncompressed) read from the input directory.
*
* @throws IOException
*/
public static long zip(File directory, File outputZipFile, boolean fsync) throws IOException
{
if (!isZip(outputZipFile.getName())) {
log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
}
try (final FileOutputStream out = new FileOutputStream(outputZipFile)) {
long bytes = zip(directory, out);
// For explanation of why fsyncing here is a good practice:
// https://github.com/druid-io/druid/pull/5187#pullrequestreview-85188984
if (fsync) {
out.getChannel().force(true);
}
return bytes;
}
}
/**
* Zip the contents of directory into the file indicated by outputZipFile. Sub directories are skipped
@ -64,20 +93,14 @@ public class CompressionUtils
*/
public static long zip(File directory, File outputZipFile) throws IOException
{
if (!isZip(outputZipFile.getName())) {
log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory);
}
try (final FileOutputStream out = new FileOutputStream(outputZipFile)) {
return zip(directory, out);
}
return zip(directory, outputZipFile, false);
}
/**
* Zips the contents of the input directory to the output stream. Sub directories are skipped
*
* @param directory The directory whose contents should be added to the zip in the output stream.
* @param out The output stream to write the zip data to. It is closed in the process
* @param out The output stream to write the zip data to. Caller is responsible for closing this stream.
*
* @return The number of bytes (uncompressed) read from the input directory.
*
@ -88,23 +111,23 @@ public class CompressionUtils
if (!directory.isDirectory()) {
throw new IOE("directory[%s] is not a directory", directory);
}
final File[] files = directory.listFiles();
final ZipOutputStream zipOut = new ZipOutputStream(out);
long totalSize = 0;
try (final ZipOutputStream zipOut = new ZipOutputStream(out)) {
for (File file : files) {
log.info("Adding file[%s] with size[%,d]. Total size so far[%,d]", file, file.length(), totalSize);
if (file.length() >= Integer.MAX_VALUE) {
zipOut.finish();
throw new IOE("file[%s] too large [%,d]", file, file.length());
}
zipOut.putNextEntry(new ZipEntry(file.getName()));
totalSize += Files.asByteSource(file).copyTo(zipOut);
for (File file : directory.listFiles()) {
log.info("Adding file[%s] with size[%,d]. Total size so far[%,d]", file, file.length(), totalSize);
if (file.length() >= Integer.MAX_VALUE) {
zipOut.finish();
throw new IOE("file[%s] too large [%,d]", file, file.length());
}
zipOut.closeEntry();
// Workarround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf
zipOut.flush();
zipOut.putNextEntry(new ZipEntry(file.getName()));
totalSize += Files.asByteSource(file).copyTo(zipOut);
}
zipOut.closeEntry();
// Workaround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf
zipOut.flush();
zipOut.finish();
return totalSize;
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
@ -33,23 +34,22 @@ import java.io.IOException;
import java.net.URI;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.UUID;
/**
*/
public class LocalDataSegmentPusher implements DataSegmentPusher
{
private static final Logger log = new Logger(LocalDataSegmentPusher.class);
private static final String INDEX_FILENAME = "index.zip";
private static final String DESCRIPTOR_FILENAME = "descriptor.json";
private final LocalDataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
@Inject
public LocalDataSegmentPusher(
LocalDataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
public LocalDataSegmentPusher(LocalDataSegmentPusherConfig config, ObjectMapper jsonMapper)
{
this.config = config;
this.jsonMapper = jsonMapper;
@ -71,7 +71,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
}
@Override
public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOException
public DataSegment push(File dataSegmentFile, DataSegment segment, boolean replaceExisting) throws IOException
{
final String storageDir = this.getStorageDir(segment);
final File baseStorageDir = config.getStorageDirectory();
@ -95,27 +95,53 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
final File tmpOutDir = new File(baseStorageDir, intermediateDirFor(storageDir));
log.info("Creating intermediate directory[%s] for segment[%s]", tmpOutDir.toString(), segment.getIdentifier());
final long size = compressSegment(dataSegmentFile, tmpOutDir);
FileUtils.forceMkdir(tmpOutDir);
final DataSegment dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip").toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)),
tmpOutDir
);
// moving the temporary directory to the final destination, once success the potentially concurrent push operations
// will be failed and will read the descriptor.json created by current push operation directly
FileUtils.forceMkdir(outDir.getParentFile());
try {
Files.move(tmpOutDir.toPath(), outDir.toPath());
final File tmpIndexFile = new File(tmpOutDir, INDEX_FILENAME);
final long size = compressSegment(dataSegmentFile, tmpIndexFile);
final File tmpDescriptorFile = new File(tmpOutDir, DESCRIPTOR_FILENAME);
DataSegment dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_FILENAME).toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)),
tmpDescriptorFile
);
FileUtils.forceMkdir(outDir);
if (replaceExisting) {
final File indexFileTarget = new File(outDir, tmpIndexFile.getName());
final File descriptorFileTarget = new File(outDir, tmpDescriptorFile.getName());
if (!tmpIndexFile.renameTo(indexFileTarget)) {
throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, indexFileTarget);
}
if (!tmpDescriptorFile.renameTo(descriptorFileTarget)) {
throw new IOE("Failed to rename [%s] to [%s]", tmpDescriptorFile, descriptorFileTarget);
}
} else {
try {
Files.move(tmpIndexFile.toPath(), outDir.toPath().resolve(tmpIndexFile.toPath().getFileName()));
}
catch (FileAlreadyExistsException e) {
log.info("[%s] already exists at [%s], ignore if replication is configured", INDEX_FILENAME, outDir);
}
try {
Files.move(tmpDescriptorFile.toPath(), outDir.toPath().resolve(tmpDescriptorFile.toPath().getFileName()));
}
catch (FileAlreadyExistsException e) {
log.info("[%s] already exists at [%s], ignore if replication is configured", DESCRIPTOR_FILENAME, outDir);
dataSegment = jsonMapper.readValue(new File(outDir, DESCRIPTOR_FILENAME), DataSegment.class);
}
}
return dataSegment;
}
catch (FileAlreadyExistsException e) {
log.warn("Push destination directory[%s] exists, ignore this message if replication is configured.", outDir);
finally {
FileUtils.deleteDirectory(tmpOutDir);
return jsonMapper.readValue(new File(outDir, "descriptor.json"), DataSegment.class);
}
return dataSegment;
}
@Override
@ -129,21 +155,21 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
return "intermediate_pushes/" + storageDir + "." + UUID.randomUUID().toString();
}
private long compressSegment(File dataSegmentFile, File outDir) throws IOException
private long compressSegment(File dataSegmentFile, File dest) throws IOException
{
FileUtils.forceMkdir(outDir);
File outFile = new File(outDir, "index.zip");
log.info("Compressing files from[%s] to [%s]", dataSegmentFile, outFile);
return CompressionUtils.zip(dataSegmentFile, outFile);
log.info("Compressing files from[%s] to [%s]", dataSegmentFile, dest);
return CompressionUtils.zip(dataSegmentFile, dest, true);
}
private DataSegment createDescriptorFile(DataSegment segment, File outDir) throws IOException
private DataSegment createDescriptorFile(DataSegment segment, File dest) throws IOException
{
File descriptorFile = new File(outDir, "descriptor.json");
log.info("Creating descriptor file at[%s]", descriptorFile);
log.info("Creating descriptor file at[%s]", dest);
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(segment));
Files.write(
dest.toPath(), jsonMapper.writeValueAsBytes(segment), StandardOpenOption.CREATE, StandardOpenOption.SYNC
);
return segment;
}
}

View File

@ -556,6 +556,10 @@ public class AppenderatorImpl implements Appenderator
* Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only
* be run in the single-threaded pushExecutor.
*
* Note that this calls DataSegmentPusher.push() with replaceExisting == true which is appropriate for the indexing
* tasks it is currently being used for (local indexing and Kafka indexing). If this is going to be used by an
* indexing task type that requires replaceExisting == false, this setting will need to be pushed to the caller.
*
* @param identifier sink identifier
* @param sink sink to push
*
@ -633,9 +637,18 @@ public class AppenderatorImpl implements Appenderator
// Retry pushing segments because uploading to deep storage might fail especially for cloud storage types
final DataSegment segment = RetryUtils.retry(
// The appenderator is currently being used for the local indexing task and the Kafka indexing task. For the
// Kafka indexing task, pushers MUST overwrite any existing objects in deep storage with the same identifier
// in order to maintain exactly-once semantics. If they do not and instead favor existing objects, in case of
// failure during publishing, the indexed data may not represent the checkpointed state and data loss or
// duplication may occur. See: https://github.com/druid-io/druid/issues/5161. The local indexing task does not
// support replicas where different tasks could generate segments with the same identifier but potentially
// different contents so it is okay if existing objects are overwritten. In both of these cases, we want to
// favor the most recently pushed segment so replaceExisting == true.
() -> dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)),
true
),
exception -> exception instanceof Exception,
5

View File

@ -444,9 +444,17 @@ public class RealtimePlumber implements Plumber
log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier());
// The realtime plumber can generate segments with the same identifier (i.e. replica tasks) but does not
// have any strict requirement that the contents of these segments be identical. It is possible that two
// tasks generate a segment with the same identifier containing different data, and in this situation we
// want to favor the data from the task which pushed first. This is because it is possible that one
// historical could load the segment after the first task pushed and another historical load the same
// segment after the second task pushed. If the second task's segment overwrote the first one, the second
// historical node would be serving different data from the first. Hence set replaceExisting == false.
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)),
false
);
log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier());
segmentPublisher.publishSegment(segment);

View File

@ -23,11 +23,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.TestHelper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -54,7 +56,18 @@ public class LocalDataSegmentPusherTest
Intervals.utc(0, 1),
"v1",
null,
ImmutableList.of("dim1"),
null,
NoneShardSpec.instance(),
null,
-1
);
DataSegment dataSegment2 = new DataSegment(
"ds",
Intervals.utc(0, 1),
"v1",
null,
ImmutableList.of("dim2"),
null,
NoneShardSpec.instance(),
null,
@ -79,8 +92,8 @@ public class LocalDataSegmentPusherTest
*/
final DataSegment dataSegment2 = dataSegment.withVersion("v2");
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment);
DataSegment returnSegment2 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment2);
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true);
DataSegment returnSegment2 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment2, true);
Assert.assertNotNull(returnSegment1);
Assert.assertEquals(dataSegment, returnSegment1);
@ -106,14 +119,45 @@ public class LocalDataSegmentPusherTest
}
@Test
public void testFirstPushWinsForConcurrentPushes() throws IOException
public void testFirstPushWinsForConcurrentPushesWhenReplaceExistingFalse() throws IOException
{
File replicatedDataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8));
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment);
DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment);
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false);
DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment2, false);
Assert.assertEquals(returnSegment1, returnSegment2);
Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions());
Assert.assertEquals(dataSegment.getDimensions(), returnSegment2.getDimensions());
File unzipDir = new File(config.storageDirectory, "unzip");
FileUtils.forceMkdir(unzipDir);
CompressionUtils.unzip(
new File(config.storageDirectory, "/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"),
unzipDir
);
Assert.assertEquals(0x9, Ints.fromByteArray(Files.toByteArray(new File(unzipDir, "version.bin"))));
}
@Test
public void testLastPushWinsForConcurrentPushesWhenReplaceExistingTrue() throws IOException
{
File replicatedDataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8));
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true);
DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment2, true);
Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions());
Assert.assertEquals(dataSegment2.getDimensions(), returnSegment2.getDimensions());
File unzipDir = new File(config.storageDirectory, "unzip");
FileUtils.forceMkdir(unzipDir);
CompressionUtils.unzip(
new File(config.storageDirectory, "/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"),
unzipDir
);
Assert.assertEquals(0x8, Ints.fromByteArray(Files.toByteArray(new File(unzipDir, "version.bin"))));
}
@Test
@ -124,7 +168,7 @@ public class LocalDataSegmentPusherTest
config.storageDirectory = new File(config.storageDirectory, "xxx");
Assert.assertTrue(config.storageDirectory.mkdir());
config.storageDirectory.setWritable(false);
localDataSegmentPusher.push(dataSegmentFiles, dataSegment);
localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true);
}
@Test

View File

@ -193,7 +193,7 @@ public class AppenderatorTester implements AutoCloseable
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException
{
if (enablePushFailure && mustFail) {
mustFail = false;

View File

@ -155,7 +155,7 @@ public class CliRealtimeExample extends ServerRunnable
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException
{
return segment;
}