Zero-copy local deep storage. (#13394)

* Zero-copy local deep storage.

This is useful for local deep storage, since it reduces disk usage and
makes Historicals able to load segments instantaneously.

Two changes:

1) Introduce "druid.storage.zip" parameter for local storage, which defaults
   to false. This changes default behavior from writing an index.zip to writing
   a regular directory. This is safe to do even during a rolling update, because
   the older code actually already handled unzipped directories being present
   on local deep storage.

2) In LocalDataSegmentPuller and LocalDataSegmentPusher, use hard links
   instead of copies when possible. (Generally this is possible when the
   source and destination directory are on the same filesystem.)
This commit is contained in:
Gian Merlino 2022-12-12 17:28:24 -08:00 committed by GitHub
parent 8e386072e9
commit de5a4bafcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 446 additions and 106 deletions

View File

@ -53,6 +53,12 @@ import java.util.UUID;
public class FileUtils
{
public enum LinkOrCopyResult
{
LINK,
COPY
}
/**
* Useful for retry functionality that doesn't want to stop Throwables, but does want to retry on Exceptions
*/
@ -461,6 +467,26 @@ public class FileUtils
org.apache.commons.io.FileUtils.deleteDirectory(directory);
}
/**
* Hard-link "src" as "dest", if possible. If not possible -- perhaps they are on separate filesystems -- then
* copy "src" to "dest".
*
* @return whether a link or copy was made. Can be safely ignored if you don't care.
*
* @throws IOException if something went wrong
*/
public static LinkOrCopyResult linkOrCopy(final File src, final File dest) throws IOException
{
try {
Files.createLink(dest.toPath(), src.toPath());
return LinkOrCopyResult.LINK;
}
catch (IOException e) {
Files.copy(src.toPath(), dest.toPath(), StandardCopyOption.REPLACE_EXISTING);
return LinkOrCopyResult.COPY;
}
}
public interface OutputStreamConsumer<T>
{
T apply(OutputStream outputStream) throws IOException;

View File

@ -244,4 +244,45 @@ public class FileUtilsTest
Assert.assertEquals(data.length(), result);
Assert.assertEquals(data, StringUtils.fromUtf8(Files.readAllBytes(dstFile.toPath())));
}
@Test
public void testLinkOrCopy1() throws IOException
{
// Will be a LINK.
final File fromFile = temporaryFolder.newFile();
final File toDir = temporaryFolder.newFolder();
final File toFile = new File(toDir, "toFile");
Files.write(fromFile.toPath(), StringUtils.toUtf8("foo"));
final FileUtils.LinkOrCopyResult linkOrCopyResult = FileUtils.linkOrCopy(fromFile, toFile);
// Verify the new link.
Assert.assertEquals(FileUtils.LinkOrCopyResult.LINK, linkOrCopyResult);
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
// Verify they are actually the same file.
Files.write(fromFile.toPath(), StringUtils.toUtf8("bar"));
Assert.assertEquals("bar", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
}
@Test
public void testLinkOrCopy2() throws IOException
{
// Will be a COPY, because the destination file already exists and therefore Files.createLink fails.
final File fromFile = temporaryFolder.newFile();
final File toFile = temporaryFolder.newFile();
Files.write(fromFile.toPath(), StringUtils.toUtf8("foo"));
final FileUtils.LinkOrCopyResult linkOrCopyResult = FileUtils.linkOrCopy(fromFile, toFile);
// Verify the new link.
Assert.assertEquals(FileUtils.LinkOrCopyResult.COPY, linkOrCopyResult);
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
// Verify they are not the same file.
Files.write(fromFile.toPath(), StringUtils.toUtf8("bar"));
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
}
}

View File

@ -25,29 +25,52 @@ title: "Deep storage"
Deep storage is where segments are stored. It is a storage mechanism that Apache Druid does not provide. This deep storage infrastructure defines the level of durability of your data, as long as Druid processes can see this storage infrastructure and get at the segments stored on it, you will not lose data no matter how many Druid nodes you lose. If segments disappear from this storage layer, then you will lose whatever data those segments represented.
## Local Mount
## Local
A local mount can be used for storage of segments as well. This allows you to use just your local file system or anything else that can be mount locally like NFS, Ceph, etc. This is the default deep storage implementation.
Local storage is intended for use in the following situations:
In order to use a local mount for deep storage, you need to set the following configuration in your common configs.
- You have just one server.
- Or, you have multiple servers, and they all have access to a shared filesystem (for example: NFS).
In multi-server production clusters, rather than local storage with a shared filesystem, it is instead recommended to
use cloud-based deep storage ([Amazon S3](#amazon-s3-or-s3-compatible), [Google Cloud Storage](#google-cloud-storage),
or [Azure Blob Storage](#azure-blob-storage)), S3-compatible storage (like Minio), or [HDFS](#hdfs). These options are
generally more convenient, more scalable, and more robust than setting up a shared filesystem.
The following configurations in `common.runtime.properties` apply to local storage:
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.storage.type`|local||Must be set.|
|`druid.storage.storageDirectory`||Directory for storing segments.|Must be set.|
|`druid.storage.type`|`local`||Must be set.|
|`druid.storage.storageDirectory`|any local directory|Directory for storing segments. Must be different from `druid.segmentCache.locations` and `druid.segmentCache.infoDir`.|`/tmp/druid/localStorage`|
|`druid.storage.zip`|`true`, `false`|Whether segments in `druid.storage.storageDirectory` are written as directories (`false`) or zip files (`true`).|`false`|
Note that you should generally set `druid.storage.storageDirectory` to something different from `druid.segmentCache.locations` and `druid.segmentCache.infoDir`.
For example:
If you are using the Hadoop indexer in local mode, then just give it a local file as your output directory and it will work.
```
druid.storage.type=local
druid.storage.storageDirectory=/tmp/druid/localStorage
```
## S3-compatible
The `druid.storage.storageDirectory` must be set to a different path than `druid.segmentCache.locations` or
`druid.segmentCache.infoDir`.
See [druid-s3-extensions extension documentation](../development/extensions-core/s3.md).
## Amazon S3 or S3-compatible
See [`druid-s3-extensions`](../development/extensions-core/s3.md).
## Google Cloud Storage
See [`druid-google-extensions`](../development/extensions-core/google.md).
## Azure Blob Storage
See [`druid-azure-extensions`](../development/extensions-core/azure.md).
## HDFS
See [druid-hdfs-storage extension documentation](../development/extensions-core/hdfs.md).
## Additional Deep Stores
## Additional options
For additional deep stores, please see our [extensions list](../development/extensions.md).
For additional deep storage options, please see our [extensions list](../development/extensions.md).

View File

@ -54,7 +54,6 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
private static final Logger log = new Logger(HdfsDataSegmentPusher.class);
private final Configuration hadoopConfig;
private final ObjectMapper jsonMapper;
// We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA.
// Please see https://github.com/apache/druid/pull/5684
@ -68,7 +67,6 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
)
{
this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper;
Path storageDir = new Path(config.getStorageDirectory());
this.fullyQualifiedStorageDirectory = Suppliers.memoize(
() -> {

View File

@ -3159,6 +3159,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
};
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
dataSegmentPusherConfig.zip = true;
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
toolboxFactory = new TaskToolboxFactory(

View File

@ -3124,6 +3124,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
};
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
dataSegmentPusherConfig.zip = true;
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
toolboxFactory = new TaskToolboxFactory(

View File

@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
@ -162,6 +163,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -943,7 +945,8 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
List<File> segmentFiles = new ArrayList<>();
for (DataSegment segment : mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of("2011-04-01/P4D"))) {
File file = new File((String) segment.getLoadSpec().get("path"));
FileUtils.mkdirp(file);
FileUtils.mkdirp(file.getParentFile());
Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
segmentFiles.add(file);
}

View File

@ -29,6 +29,7 @@ import java.io.File;
import java.io.IOException;
/**
*
*/
public class LocalDataSegmentKiller implements DataSegmentKiller
{
@ -51,9 +52,12 @@ public class LocalDataSegmentKiller implements DataSegmentKiller
log.info("Deleting segment[%s] from directory[%s].", segment.getId(), path);
try {
if (path.getName().endsWith(".zip")) {
if ((path.getName().endsWith(".zip") && path.isFile()) ||
(path.getName().equals(LocalDataSegmentPusher.INDEX_DIR) && path.isDirectory())) {
// path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
// or .../dataSource/interval/version/partitionNum/UUID/xxx.zip
// or -- > .../dataSource/interval/version/partitionNum/index/
// or .../dataSource/interval/version/partitionNum/UUID/index/
File parentDir = path.getParentFile();
FileUtils.deleteDirectory(parentDir);
@ -62,13 +66,18 @@ public class LocalDataSegmentKiller implements DataSegmentKiller
parentDir = parentDir.getParentFile();
int maxDepth = 4; // if for some reason there's no datasSource directory, stop recursing somewhere reasonable
while (parentDir != null && --maxDepth >= 0) {
if (!parentDir.delete() || segment.getDataSource().equals(parentDir.getName())) {
// parentDir.listFiles().length > 0 check not strictly necessary, because parentDir.delete() fails on
// nonempty directories. However, including it here is nice since it makes our intent very clear (only
// remove nonempty directories) and it prevents making delete syscalls that are doomed to failure.
if (parentDir.listFiles().length > 0
|| !parentDir.delete()
|| segment.getDataSource().equals(parentDir.getName())) {
break;
}
parentDir = parentDir.getParentFile();
}
} else {
} else if (path.exists()) {
throw new SegmentLoadingException("Unknown file type[%s]", path);
}
}

View File

@ -43,6 +43,7 @@ import java.net.URI;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CancellationException;
@ -125,40 +126,43 @@ public class LocalDataSegmentPuller implements URIDataPuller
public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException
{
if (sourceFile.isDirectory()) {
if (sourceFile.equals(dir)) {
log.info("Asked to load [%s] into itself, done!", dir);
return new FileUtils.FileCopyResult(sourceFile);
}
final File[] files = sourceFile.listFiles();
if (files == null) {
throw new SegmentLoadingException("No files found in [%s]", sourceFile.getAbsolutePath());
}
final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(sourceFile);
for (final File oldFile : files) {
if (oldFile.isDirectory()) {
log.info("[%s] is a child directory, skipping", oldFile.getAbsolutePath());
continue;
try {
final File[] files = sourceFile.listFiles();
if (files == null) {
throw new SegmentLoadingException("No files found in [%s]", sourceFile.getAbsolutePath());
}
result.addFiles(
FileUtils.retryCopy(
Files.asByteSource(oldFile),
new File(dir, oldFile.getName()),
shouldRetryPredicate(),
DEFAULT_RETRY_COUNT
).getFiles()
if (sourceFile.equals(dir)) {
log.info("Asked to load [%s] into itself, done!", dir);
return new FileUtils.FileCopyResult(Arrays.asList(files));
}
final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
boolean link = true;
for (final File oldFile : files) {
if (oldFile.isDirectory()) {
log.info("[%s] is a child directory, skipping", oldFile.getAbsolutePath());
continue;
}
final File newFile = new File(dir, oldFile.getName());
final FileUtils.LinkOrCopyResult linkOrCopyResult = FileUtils.linkOrCopy(oldFile, newFile);
link = link && linkOrCopyResult == FileUtils.LinkOrCopyResult.LINK;
result.addFile(newFile);
}
log.info(
"%s %d bytes from [%s] to [%s]",
link ? "Linked" : "Copied",
result.size(),
sourceFile.getAbsolutePath(),
dir.getAbsolutePath()
);
return result;
}
log.info(
"Copied %d bytes from [%s] to [%s]",
result.size(),
sourceFile.getAbsolutePath(),
dir.getAbsolutePath()
);
return result;
}
if (CompressionUtils.isZip(sourceFile.getName())) {
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to load from local directory [%s]", sourceFile.getAbsolutePath());
}
} else if (CompressionUtils.isZip(sourceFile.getName())) {
try {
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
Files.asByteSource(sourceFile),
@ -177,8 +181,7 @@ public class LocalDataSegmentPuller implements URIDataPuller
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to unzip file [%s]", sourceFile.getAbsolutePath());
}
}
if (CompressionUtils.isGz(sourceFile.getName())) {
} else if (CompressionUtils.isGz(sourceFile.getName())) {
final File outFile = new File(dir, CompressionUtils.getGzBaseName(sourceFile.getName()));
final FileUtils.FileCopyResult result = CompressionUtils.gunzip(
Files.asByteSource(sourceFile),
@ -192,8 +195,9 @@ public class LocalDataSegmentPuller implements URIDataPuller
outFile.getAbsolutePath()
);
return result;
} else {
throw new SegmentLoadingException("Do not know how to handle source [%s]", sourceFile.getAbsolutePath());
}
throw new SegmentLoadingException("Do not know how to handle source [%s]", sourceFile.getAbsolutePath());
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
@ -31,6 +32,8 @@ import org.apache.druid.utils.CompressionUtils;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Map;
import java.util.UUID;
@ -38,7 +41,8 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
{
private static final Logger log = new Logger(LocalDataSegmentPusher.class);
private static final String INDEX_FILENAME = "index.zip";
public static final String INDEX_DIR = "index";
public static final String INDEX_ZIP_FILENAME = "index.zip";
private final LocalDataSegmentPusherConfig config;
@ -76,7 +80,11 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
log.debug("Copying segment[%s] to local filesystem at location[%s]", segment.getId(), outDir.toString());
// Add binary version to the DataSegment object.
segment = segment.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
if (dataSegmentFile.equals(outDir)) {
// Input and output directories are the same. Compute size, build a loadSpec, and return.
long size = 0;
for (File file : dataSegmentFile.listFiles()) {
size += file.length();
@ -85,31 +93,10 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
return segment.withLoadSpec(makeLoadSpec(outDir.toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
}
final File tmpOutDir = new File(config.getStorageDirectory(), makeIntermediateDir());
log.debug("Creating intermediate directory[%s] for segment[%s].", tmpOutDir.toString(), segment.getId());
FileUtils.mkdirp(tmpOutDir);
try {
final File tmpIndexFile = new File(tmpOutDir, INDEX_FILENAME);
final long size = compressSegment(dataSegmentFile, tmpIndexFile);
final DataSegment dataSegment = segment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_FILENAME).toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
FileUtils.mkdirp(outDir);
final File indexFileTarget = new File(outDir, tmpIndexFile.getName());
if (!tmpIndexFile.renameTo(indexFileTarget)) {
throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, indexFileTarget);
}
return dataSegment;
}
finally {
FileUtils.deleteDirectory(tmpOutDir);
} else if (config.isZip()) {
return pushZip(dataSegmentFile, outDir, segment);
} else {
return pushNoZip(dataSegmentFile, outDir, segment);
}
}
@ -124,9 +111,80 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
return "intermediate_pushes/" + UUID.randomUUID();
}
private long compressSegment(File dataSegmentFile, File dest) throws IOException
private DataSegment pushZip(final File inDir, final File outDir, final DataSegment baseSegment) throws IOException
{
log.debug("Compressing files from[%s] to [%s]", dataSegmentFile, dest);
return CompressionUtils.zip(dataSegmentFile, dest, true);
final File tmpSegmentDir = new File(config.getStorageDirectory(), makeIntermediateDir());
final File tmpIndexFile = new File(tmpSegmentDir, INDEX_ZIP_FILENAME);
log.debug("Creating intermediate directory[%s] for segment[%s].", tmpSegmentDir.toString(), baseSegment.getId());
FileUtils.mkdirp(tmpSegmentDir);
try {
log.debug("Compressing files from[%s] to [%s]", inDir, tmpIndexFile);
final long size = CompressionUtils.zip(inDir, tmpIndexFile, true);
FileUtils.mkdirp(outDir);
final File indexFileTarget = new File(outDir, tmpIndexFile.getName());
if (!tmpIndexFile.renameTo(indexFileTarget)) {
throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, indexFileTarget);
}
return baseSegment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_ZIP_FILENAME).toURI()))
.withSize(size);
}
finally {
FileUtils.deleteDirectory(tmpSegmentDir);
}
}
private DataSegment pushNoZip(final File inDir, final File outDir, final DataSegment baseSegment) throws IOException
{
final File tmpSegmentDir = new File(config.getStorageDirectory(), makeIntermediateDir());
FileUtils.mkdirp(tmpSegmentDir);
try {
final File[] files = inDir.listFiles();
if (files == null) {
throw new IOE("Cannot list directory [%s]", inDir);
}
long size = 0;
for (final File file : files) {
if (file.isFile()) {
size += file.length();
FileUtils.linkOrCopy(file, new File(tmpSegmentDir, file.getName()));
} else {
// Segment directories are expected to be flat.
throw new IOE("Unexpected subdirectory [%s]", file.getName());
}
}
final File segmentDir = new File(outDir, INDEX_DIR);
FileUtils.mkdirp(outDir);
try {
Files.move(tmpSegmentDir.toPath(), segmentDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
}
catch (IOException e) {
if (segmentDir.exists()) {
// Move old directory out of the way, then try again. This makes the latest push win when we push to the
// same directory twice, so behavior is compatible with the zip style of pushing.
Files.move(
segmentDir.toPath(),
new File(outDir, StringUtils.format("%s_old_%s", INDEX_DIR, UUID.randomUUID())).toPath(),
StandardCopyOption.ATOMIC_MOVE
);
Files.move(tmpSegmentDir.toPath(), segmentDir.toPath(), StandardCopyOption.ATOMIC_MOVE);
}
}
return baseSegment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_DIR).toURI()))
.withSize(size);
}
finally {
FileUtils.deleteDirectory(tmpSegmentDir);
}
}
}

View File

@ -30,8 +30,16 @@ public class LocalDataSegmentPusherConfig
@JsonProperty
public File storageDirectory = new File("/tmp/druid/localStorage");
@JsonProperty
public boolean zip = false;
public File getStorageDirectory()
{
return storageDirectory;
}
public boolean isZip()
{
return zip;
}
}

View File

@ -29,13 +29,30 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
@RunWith(Parameterized.class)
public class LocalDataSegmentKillerTest
{
private static final String DATASOURCE_NAME = "ds";
private final boolean zip;
public LocalDataSegmentKillerTest(boolean zip)
{
this.zip = zip;
}
@Parameterized.Parameters(name = "zip = {0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{false}, new Object[]{true});
}
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -46,12 +63,12 @@ public class LocalDataSegmentKillerTest
LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig());
// Create following segments and then delete them in this order and assert directory deletions
// /tmp/dataSource/interval1/v1/0/index.zip
// /tmp/dataSource/interval1/v1/1/index.zip
// /tmp/dataSource/interval1/v2/0/index.zip
// /tmp/dataSource/interval2/v1/0/index.zip
// /tmp/dataSource/interval1/v1/0/
// /tmp/dataSource/interval1/v1/1/
// /tmp/dataSource/interval1/v2/0/
// /tmp/dataSource/interval2/v1/0/
final File dataSourceDir = temporaryFolder.newFolder();
final File dataSourceDir = temporaryFolder.newFolder(DATASOURCE_NAME);
File interval1Dir = new File(dataSourceDir, "interval1");
File version11Dir = new File(interval1Dir, "v1");
@ -72,27 +89,28 @@ public class LocalDataSegmentKillerTest
makePartitionDirWithIndex(partition012Dir);
killer.kill(getSegmentWithPath(new File(partition011Dir, "index.zip").toString()));
killer.kill(getSegmentWithPath(partition011Dir));
Assert.assertFalse(partition011Dir.exists());
Assert.assertTrue(partition111Dir.exists());
Assert.assertTrue(partition021Dir.exists());
Assert.assertTrue(partition012Dir.exists());
killer.kill(getSegmentWithPath(new File(partition111Dir, "index.zip").toString()));
killer.kill(getSegmentWithPath(partition111Dir));
Assert.assertFalse(version11Dir.exists());
Assert.assertTrue(partition021Dir.exists());
Assert.assertTrue(partition012Dir.exists());
killer.kill(getSegmentWithPath(new File(partition021Dir, "index.zip").toString()));
killer.kill(getSegmentWithPath(partition021Dir));
Assert.assertFalse(interval1Dir.exists());
Assert.assertTrue(partition012Dir.exists());
killer.kill(getSegmentWithPath(new File(partition012Dir, "index.zip").toString()));
killer.kill(getSegmentWithPath(partition012Dir));
Assert.assertFalse(dataSourceDir.exists());
Assert.assertTrue(dataSourceDir.getParentFile().exists());
}
@Test
@ -100,7 +118,8 @@ public class LocalDataSegmentKillerTest
{
final LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig());
final String uuid = UUID.randomUUID().toString().substring(0, 5);
final File dataSourceDir = temporaryFolder.newFolder("dataSource");
final File emptyParentDir = temporaryFolder.newFolder();
final File dataSourceDir = new File(emptyParentDir, DATASOURCE_NAME);
final File intervalDir = new File(dataSourceDir, "interval");
final File versionDir = new File(intervalDir, "1");
final File partitionDir = new File(versionDir, "0");
@ -108,30 +127,69 @@ public class LocalDataSegmentKillerTest
makePartitionDirWithIndex(uuidDir);
killer.kill(getSegmentWithPath(new File(uuidDir, "index.zip").toString()));
killer.kill(getSegmentWithPath(uuidDir));
Assert.assertFalse(uuidDir.exists());
Assert.assertFalse(partitionDir.exists());
Assert.assertFalse(versionDir.exists());
Assert.assertFalse(intervalDir.exists());
Assert.assertFalse(dataSourceDir.exists());
// Verify that we stop after the datasource dir, even though the parent is empty.
Assert.assertTrue(emptyParentDir.exists());
Assert.assertEquals(0, emptyParentDir.listFiles().length);
}
@Test
public void testKillUniquePathWrongDataSourceNameInDirectory() throws Exception
{
// Verify that
final LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig());
final String uuid = UUID.randomUUID().toString().substring(0, 5);
final File emptyParentDir = temporaryFolder.newFolder();
final File dataSourceDir = new File(emptyParentDir, DATASOURCE_NAME + "_wrong");
final File intervalDir = new File(dataSourceDir, "interval");
final File versionDir = new File(intervalDir, "1");
final File partitionDir = new File(versionDir, "0");
final File uuidDir = new File(partitionDir, uuid);
makePartitionDirWithIndex(uuidDir);
killer.kill(getSegmentWithPath(uuidDir));
Assert.assertFalse(uuidDir.exists());
Assert.assertFalse(partitionDir.exists());
Assert.assertFalse(versionDir.exists());
Assert.assertFalse(intervalDir.exists());
Assert.assertFalse(dataSourceDir.exists());
// Verify that we stop at 4 pruned paths, even if we don't encounter the datasource-named directory.
Assert.assertTrue(emptyParentDir.exists());
Assert.assertEquals(0, emptyParentDir.listFiles().length);
}
private void makePartitionDirWithIndex(File path) throws IOException
{
FileUtils.mkdirp(path);
Assert.assertTrue(new File(path, "index.zip").createNewFile());
if (zip) {
Assert.assertTrue(new File(path, LocalDataSegmentPusher.INDEX_ZIP_FILENAME).createNewFile());
} else {
Assert.assertTrue(new File(path, LocalDataSegmentPusher.INDEX_DIR).mkdir());
}
}
private DataSegment getSegmentWithPath(String path)
private DataSegment getSegmentWithPath(File baseDirectory)
{
final String fileName = zip ? LocalDataSegmentPusher.INDEX_ZIP_FILENAME : LocalDataSegmentPusher.INDEX_DIR;
final File path = new File(baseDirectory, fileName);
return new DataSegment(
"dataSource",
DATASOURCE_NAME,
Intervals.of("2000/3000"),
"ver",
ImmutableMap.of(
"type", "local",
"path", path
"path", path.toURI().getPath()
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),

View File

@ -48,7 +48,9 @@ public class LocalDataSegmentPusherTest
public ExpectedException exception = ExpectedException.none();
LocalDataSegmentPusher localDataSegmentPusher;
LocalDataSegmentPusher localDataSegmentPusherZip;
LocalDataSegmentPusherConfig config;
LocalDataSegmentPusherConfig configZip;
File dataSegmentFiles;
DataSegment dataSegment = new DataSegment(
"ds",
@ -77,14 +79,53 @@ public class LocalDataSegmentPusherTest
public void setUp() throws IOException
{
config = new LocalDataSegmentPusherConfig();
config.zip = false;
config.storageDirectory = temporaryFolder.newFolder();
localDataSegmentPusher = new LocalDataSegmentPusher(config);
configZip = new LocalDataSegmentPusherConfig();
configZip.zip = true;
configZip.storageDirectory = temporaryFolder.newFolder();
localDataSegmentPusherZip = new LocalDataSegmentPusher(configZip);
dataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(dataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x9));
}
@Test
public void testPush() throws IOException
public void testPushZip() throws IOException
{
/* DataSegment - Used to create LoadSpec and Create outDir (Local Deep Storage location in this case)
File dataSegmentFile - Used to get location of segment files like version.bin, meta.smoosh and xxxxx.smoosh
*/
final DataSegment dataSegment2 = dataSegment.withVersion("v2");
DataSegment returnSegment1 = localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false);
DataSegment returnSegment2 = localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment2, false);
Assert.assertNotNull(returnSegment1);
Assert.assertEquals(dataSegment, returnSegment1);
Assert.assertNotNull(returnSegment2);
Assert.assertEquals(dataSegment2, returnSegment2);
Assert.assertNotEquals(
localDataSegmentPusherZip.getStorageDir(dataSegment, false),
localDataSegmentPusherZip.getStorageDir(dataSegment2, false)
);
for (DataSegment returnSegment : ImmutableList.of(returnSegment1, returnSegment2)) {
File outDir = new File(
configZip.getStorageDirectory(),
localDataSegmentPusherZip.getStorageDir(returnSegment, false)
);
File versionFile = new File(outDir, "index.zip");
Assert.assertTrue(versionFile.exists());
}
}
@Test
public void testPushNoZip() throws IOException
{
/* DataSegment - Used to create LoadSpec and Create outDir (Local Deep Storage location in this case)
File dataSegmentFile - Used to get location of segment files like version.bin, meta.smoosh and xxxxx.smoosh
@ -107,19 +148,43 @@ public class LocalDataSegmentPusherTest
for (DataSegment returnSegment : ImmutableList.of(returnSegment1, returnSegment2)) {
File outDir = new File(
config.getStorageDirectory(),
localDataSegmentPusher.getStorageDir(returnSegment, false)
new File(
config.getStorageDirectory(),
localDataSegmentPusher.getStorageDir(returnSegment, false)
),
"index"
);
File versionFile = new File(outDir, "index.zip");
// Check against loadSpec.
Assert.assertEquals(
outDir.toURI().getPath(),
returnSegment.getLoadSpec().get("path")
);
// Check for version.bin.
File versionFile = new File(outDir, "version.bin");
Assert.assertTrue(versionFile.exists());
}
}
@Test
public void testPushUseUniquePath() throws IOException
public void testPushNoZipUseUniquePath() throws IOException
{
DataSegment segment = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true);
String path = segment.getLoadSpec().get("path").toString();
Pattern pattern = Pattern.compile(
".*/ds/1970-01-01T00:00:00\\.000Z_1970-01-01T00:00:00\\.001Z/v1/0/[A-Za-z0-9-]{36}/index/$"
);
Assert.assertTrue(path, pattern.matcher(path).matches());
Assert.assertTrue(new File(path).exists());
}
@Test
public void testPushZipUseUniquePath() throws IOException
{
DataSegment segment = localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, true);
String path = segment.getLoadSpec().get("path").toString();
Pattern pattern = Pattern.compile(
".*/ds/1970-01-01T00:00:00\\.000Z_1970-01-01T00:00:00\\.001Z/v1/0/[A-Za-z0-9-]{36}/index\\.zip"
@ -129,8 +194,12 @@ public class LocalDataSegmentPusherTest
}
@Test
public void testLastPushWinsForConcurrentPushes() throws IOException
public void testLastPushWinsForConcurrentNoZipPushes() throws IOException
{
// Behavioral difference between zip and no-zip pushes when the same segment identifier is pushed twice:
// Later zip pushes overwrite earlier ones. Later no-zip pushes throw errors. In situations where the same
// segment may be pushed twice, we expect "useUniquePath" to be set on the pusher.
File replicatedDataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8));
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false);
@ -139,10 +208,38 @@ public class LocalDataSegmentPusherTest
Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions());
Assert.assertEquals(dataSegment2.getDimensions(), returnSegment2.getDimensions());
File unzipDir = new File(config.storageDirectory, "unzip");
final String expectedPath = StringUtils.format(
"%s/%s",
config.storageDirectory,
"ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index/"
);
Assert.assertEquals(expectedPath, returnSegment1.getLoadSpec().get("path"));
Assert.assertEquals(expectedPath, returnSegment2.getLoadSpec().get("path"));
final File versionFile = new File(expectedPath, "version.bin");
Assert.assertEquals(0x8, Ints.fromByteArray(Files.toByteArray(versionFile)));
}
@Test
public void testLastPushWinsForConcurrentZipPushes() throws IOException
{
// Behavioral difference between zip and no-zip pushes when the same segment identifier is pushed twice:
// Later zip pushes overwrite earlier ones. Later no-zip pushes throw errors. In situations where the same
// segment may be pushed twice, we expect "useUniquePath" to be set on the pusher.
File replicatedDataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8));
DataSegment returnSegment1 = localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false);
DataSegment returnSegment2 = localDataSegmentPusherZip.push(replicatedDataSegmentFiles, dataSegment2, false);
Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions());
Assert.assertEquals(dataSegment2.getDimensions(), returnSegment2.getDimensions());
File unzipDir = new File(configZip.storageDirectory, "unzip");
FileUtils.mkdirp(unzipDir);
CompressionUtils.unzip(
new File(config.storageDirectory, "/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"),
new File(configZip.storageDirectory, "/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"),
unzipDir
);
@ -160,27 +257,38 @@ public class LocalDataSegmentPusherTest
localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false);
}
@Test
public void testPushZipCannotCreateDirectory() throws IOException
{
exception.expect(IOException.class);
exception.expectMessage("Cannot create directory");
configZip.storageDirectory = new File(configZip.storageDirectory, "xxx");
Assert.assertTrue(configZip.storageDirectory.mkdir());
configZip.storageDirectory.setWritable(false);
localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false);
}
@Test
public void testPathForHadoopAbsolute()
{
config.storageDirectory = new File("/druid");
configZip.storageDirectory = new File("/druid");
// If this test fails because the path is returned as "file:/druid/", this can happen
// when a /druid directory exists on the local filesystem.
Assert.assertEquals(
"file:/druid",
new LocalDataSegmentPusher(config).getPathForHadoop()
new LocalDataSegmentPusher(configZip).getPathForHadoop()
);
}
@Test
public void testPathForHadoopRelative()
{
config.storageDirectory = new File("druid");
configZip.storageDirectory = new File("druid");
Assert.assertEquals(
StringUtils.format("file:%s/druid", System.getProperty("user.dir")),
new LocalDataSegmentPusher(config).getPathForHadoop()
new LocalDataSegmentPusher(configZip).getPathForHadoop()
);
}
}

View File

@ -739,6 +739,8 @@ appenders
druid-hdfs-storage
druid-s3-extensions
druid.sql.planner.maxNumericInFilters
Minio
multi-server
- ../docs/dependencies/metadata-storage.md
BasicDataSource
- ../docs/dependencies/zookeeper.md