TmpFileIOPeons to create files under the merging output directory, instead of java.io.tmpdir (#3990)

* In IndexMerger and IndexMergerV9, create temporary files under the output directory/tmpPeonFiles, instead of java.io.tmpdir

* Use FileUtils.forceMkdir() across the codebase and remove some unused code

* Fix test

* Fix PullDependencies.run()

* Unused import
This commit is contained in:
Roman Leventov 2017-03-02 16:05:12 -06:00 committed by Gian Merlino
parent ea1f5b7954
commit 81a5f9851f
16 changed files with 87 additions and 243 deletions

View File

@ -19,11 +19,10 @@
package io.druid.storage.azure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller;
@ -56,9 +55,8 @@ public class AzureDataSegmentPuller implements DataSegmentPuller
)
throws SegmentLoadingException
{
prepareOutDir(outDir);
try {
prepareOutDir(outDir);
final ByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath);
final io.druid.java.util.common.FileUtils.FileCopyResult result = CompressionUtils.unzip(
@ -99,15 +97,9 @@ public class AzureDataSegmentPuller implements DataSegmentPuller
getSegmentFiles(containerName, blobPath, outDir);
}
public void prepareOutDir(final File outDir) throws ISE
@VisibleForTesting
void prepareOutDir(final File outDir) throws IOException
{
if (!outDir.exists()) {
outDir.mkdirs();
}
if (!outDir.isDirectory()) {
throw new ISE("[%s] must be a directory.", outDir);
}
FileUtils.forceMkdir(outDir);
}
}

View File

@ -19,11 +19,11 @@
package io.druid.storage.google;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.inject.Inject;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentPuller;
@ -64,9 +64,9 @@ public class GoogleDataSegmentPuller implements DataSegmentPuller, URIDataPuller
{
LOG.info("Pulling index at path[%s] to outDir[%s]", bucket, path, outDir.getAbsolutePath());
prepareOutDir(outDir);
try {
prepareOutDir(outDir);
final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, path);
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
byteSource,
@ -91,16 +91,10 @@ public class GoogleDataSegmentPuller implements DataSegmentPuller, URIDataPuller
}
}
// Needs to be public for the tests.
public void prepareOutDir(final File outDir) throws ISE
@VisibleForTesting
void prepareOutDir(final File outDir) throws IOException
{
if (!outDir.exists()) {
outDir.mkdirs();
}
if (!outDir.isDirectory()) {
throw new ISE("outDir[%s] must be a directory.", outDir);
}
org.apache.commons.io.FileUtils.forceMkdir(outDir);
}
@Override

View File

@ -25,11 +25,9 @@ import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.inject.Inject;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.UOE;
@ -38,6 +36,12 @@ import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.URIDataPuller;
import io.druid.timeline.DataSegment;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.StorageObject;
import javax.tools.FileObject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -47,11 +51,6 @@ import java.io.Writer;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.tools.FileObject;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.StorageObject;
/**
* A data segment puller that also hanldes URI data pulls.
@ -176,15 +175,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller, URIDataPuller
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
}
if (!outDir.exists()) {
outDir.mkdirs();
}
if (!outDir.isDirectory()) {
throw new ISE("outDir[%s] must be a directory.", outDir);
}
try {
org.apache.commons.io.FileUtils.forceMkdir(outDir);
final URI uri = URI.create(String.format("s3://%s/%s", s3Coords.bucket, s3Coords.path));
final ByteSource byteSource = new ByteSource()
{

View File

@ -528,9 +528,7 @@ public class HadoopConverterJob
context.setStatus("CONVERTING");
context.progress();
final File outDir = new File(tmpDir, "out");
if (!outDir.mkdir() && (!outDir.exists() || !outDir.isDirectory())) {
throw new IOException(String.format("Could not create output directory [%s]", outDir));
}
FileUtils.forceMkdir(outDir);
HadoopDruidConverterConfig.INDEX_MERGER.convert(
inDir,
outDir,

View File

@ -25,75 +25,15 @@ import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
/**
*/
public class
StreamUtils
public class StreamUtils
{
// The default buffer size to use (from IOUtils)
private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
/**
* Copy from an input stream to a file (and buffer it) and close the input stream.
* <p/>
* It is highly recommended to use FileUtils.retryCopy whenever possible, and not use a raw `InputStream`
*
* @param is The input stream to copy bytes from. `is` is closed regardless of the copy result.
* @param file The file to copy bytes to. Any parent directories are automatically created.
*
* @return The count of bytes written to the file
*
* @throws IOException
*/
public static long copyToFileAndClose(InputStream is, File file) throws IOException
{
file.getParentFile().mkdirs();
try (OutputStream os = new BufferedOutputStream(new FileOutputStream(file))) {
final long result = ByteStreams.copy(is, os);
// Workarround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf
os.flush();
return result;
}
finally {
is.close();
}
}
/**
* Copy bytes from `is` to `file` but timeout if the copy takes too long. The timeout is best effort and not
* guaranteed. Specifically, `is.read` will not be interrupted.
*
* @param is The `InputStream` to copy bytes from. It is closed regardless of copy results.
* @param file The `File` to copy bytes to
* @param timeout The timeout (in ms) of the copy.
*
* @return The size of bytes written to `file`
*
* @throws IOException
* @throws TimeoutException If `timeout` is exceeded
*/
public static long copyToFileAndClose(InputStream is, File file, long timeout) throws IOException, TimeoutException
{
file.getParentFile().mkdirs();
try (OutputStream os = new BufferedOutputStream(new FileOutputStream(file))) {
final long retval = copyWithTimeout(is, os, timeout);
// Workarround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf
os.flush();
return retval;
}
finally {
is.close();
}
}
/**
* Copy from `is` to `os` and close the streams regardless of the result.
@ -119,35 +59,6 @@ StreamUtils
}
}
/**
* Copy from the input stream to the output stream and tries to exit if the copy exceeds the timeout. The timeout
* is best effort. Specifically, `is.read` will not be interrupted.
*
* @param is The input stream to read bytes from.
* @param os The output stream to write bytes to.
* @param timeout The timeout (in ms) for the copy operation
*
* @return The total size of bytes written to `os`
*
* @throws IOException
* @throws TimeoutException If `tiemout` is exceeded
*/
public static long copyWithTimeout(InputStream is, OutputStream os, long timeout) throws IOException, TimeoutException
{
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int n;
long startTime = System.currentTimeMillis();
long size = 0;
while (-1 != (n = is.read(buffer))) {
if (System.currentTimeMillis() - startTime > timeout) {
throw new TimeoutException(String.format("Copy time has exceeded %,d millis", timeout));
}
os.write(buffer, 0, n);
size += n;
}
return size;
}
/**
* Retry copy attempts from input stream to output stream. Does *not* check to make sure data was intact during the transfer
*

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import org.apache.commons.io.FileUtils;
import java.io.Closeable;
import java.io.File;
@ -78,9 +79,7 @@ public class LimitedTemporaryStorage implements Closeable
throw new ISE("Closed");
}
if (!storageDirectory.exists() && !storageDirectory.mkdir()) {
throw new IOException(String.format("Cannot create storageDirectory: %s", storageDirectory));
}
FileUtils.forceMkdir(storageDirectory);
final File theFile = new File(storageDirectory, String.format("%08d.tmp", files.size()));
final EnumSet<StandardOpenOption> openOptions = EnumSet.of(

View File

@ -81,6 +81,7 @@ import io.druid.segment.serde.FloatGenericColumnSupplier;
import io.druid.segment.serde.LongGenericColumnPartSerde;
import io.druid.segment.serde.LongGenericColumnSupplier;
import io.druid.segment.serde.SpatialIndexColumnPartSupplier;
import org.apache.commons.io.FileUtils;
import org.joda.time.Interval;
import java.io.ByteArrayOutputStream;
@ -502,7 +503,7 @@ public class IndexIO
try {
SmooshedFileMapper v8SmooshedFiles = closer.register(Smoosh.map(v8Dir));
v9Dir.mkdirs();
FileUtils.forceMkdir(v9Dir);
final FileSmoosher v9Smoosher = closer.register(new FileSmoosher(v9Dir));
ByteStreams.write(Ints.toByteArray(9), Files.newOutputStreamSupplier(new File(v9Dir, "version.bin")));

View File

@ -174,12 +174,7 @@ public class IndexMerger
);
}
if (!outDir.exists()) {
outDir.mkdirs();
}
if (!outDir.isDirectory()) {
throw new ISE("Can only persist to directories, [%s] wasn't a directory", outDir);
}
FileUtils.forceMkdir(outDir);
log.info("Starting persist for interval[%s], rows[%,d]", dataInterval, index.size());
return merge(
@ -335,9 +330,7 @@ public class IndexMerger
) throws IOException
{
FileUtils.deleteDirectory(outDir);
if (!outDir.mkdirs()) {
throw new ISE("Couldn't make outdir[%s].", outDir);
}
FileUtils.forceMkdir(outDir);
final List<String> mergedDimensions = getMergedDimensions(indexes);
@ -496,9 +489,7 @@ public class IndexMerger
) throws IOException
{
FileUtils.deleteDirectory(outDir);
if (!outDir.mkdirs()) {
throw new ISE("Couldn't make outdir[%s].", outDir);
}
FileUtils.forceMkdir(outDir);
final List<String> mergedDimensions = getMergedDimensions(indexes);
@ -625,27 +616,16 @@ public class IndexMerger
}
Closer closer = Closer.create();
final Interval dataInterval;
final File v8OutDir = new File(outDir, "v8-tmp");
v8OutDir.mkdirs();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
FileUtils.deleteDirectory(v8OutDir);
}
});
final IOPeon ioPeon = new TmpFileIOPeon();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
ioPeon.close();
}
});
try {
final Interval dataInterval;
final File v8OutDir = new File(outDir, "v8-tmp");
FileUtils.forceMkdir(v8OutDir);
registerDeleteDirectory(closer, v8OutDir);
File tmpPeonFilesDir = new File(v8OutDir, "tmpPeonFiles");
FileUtils.forceMkdir(tmpPeonFilesDir);
registerDeleteDirectory(closer, tmpPeonFilesDir);
final IOPeon ioPeon = new TmpFileIOPeon(tmpPeonFilesDir, true);
closer.register(ioPeon);
/************* Main index.drd file **************/
progress.progress();
long startTime = System.currentTimeMillis();
@ -871,7 +851,7 @@ public class IndexMerger
}
File smooshDir = new File(v8OutDir, "smoosher");
smooshDir.mkdir();
FileUtils.forceMkdir(smooshDir);
for (Map.Entry<String, File> entry : Smoosh.smoosh(v8OutDir, smooshDir, files).entrySet()) {
entry.getValue().delete();
@ -906,6 +886,18 @@ public class IndexMerger
}
}
static void registerDeleteDirectory(Closer closer, final File dir)
{
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
FileUtils.deleteDirectory(dir);
}
});
}
protected DimensionHandler[] makeDimensionHandlers(final List<String> mergedDimensions, final List<ColumnCapabilitiesImpl> dimCapabilities)
{
final DimensionHandler[] handlers = new DimensionHandler[mergedDimensions.size()];

View File

@ -57,7 +57,6 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -128,28 +127,18 @@ public class IndexMergerV9 extends IndexMerger
}
Closer closer = Closer.create();
final IOPeon ioPeon = new TmpFileIOPeon(false);
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
ioPeon.close();
}
});
final FileSmoosher v9Smoosher = new FileSmoosher(outDir);
final File v9TmpDir = new File(outDir, "v9-tmp");
v9TmpDir.mkdirs();
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
FileUtils.deleteDirectory(v9TmpDir);
}
});
log.info("Start making v9 index files, outDir:%s", outDir);
try {
final FileSmoosher v9Smoosher = new FileSmoosher(outDir);
final File v9TmpDir = new File(outDir, "v9-tmp");
FileUtils.forceMkdir(v9TmpDir);
registerDeleteDirectory(closer, v9TmpDir);
log.info("Start making v9 index files, outDir:%s", outDir);
File tmpPeonFilesDir = new File(v9TmpDir, "tmpPeonFiles");
FileUtils.forceMkdir(tmpPeonFilesDir);
registerDeleteDirectory(closer, tmpPeonFilesDir);
final IOPeon ioPeon = new TmpFileIOPeon(tmpPeonFilesDir, false);
closer.register(ioPeon);
long startTime = System.currentTimeMillis();
ByteStreams.write(
Ints.toByteArray(IndexIO.V9_VERSION),

View File

@ -42,7 +42,6 @@ import io.druid.segment.data.GenericIndexedWriter;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.TmpFileIOPeon;
import io.druid.segment.data.VSizeIndexedWriter;
import java.io.Closeable;
@ -59,7 +58,6 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme
private static final Logger log = new Logger(StringDimensionMergerLegacy.class);
private VSizeIndexedWriter encodedValueWriterV8;
private IOPeon spatialIoPeon;
private File dictionaryFile;
public StringDimensionMergerLegacy(
@ -127,11 +125,10 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme
RTree tree = null;
spatialWriter = null;
boolean hasSpatial = capabilities.hasSpatialIndexes();
spatialIoPeon = new TmpFileIOPeon();
if (hasSpatial) {
String spatialFilename = String.format("%s.spatial", dimensionName);
spatialWriter = new ByteBufferWriter<>(
spatialIoPeon, spatialFilename, new IndexedRTree.ImmutableRTreeObjectStrategy(bmpFactory)
ioPeon, spatialFilename, new IndexedRTree.ImmutableRTreeObjectStrategy(bmpFactory)
);
spatialWriter.open();
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bmpFactory), bmpFactory);
@ -210,7 +207,6 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme
spatialWriter.close();
serializerUtils.writeString(spatialIndexFile, dimensionName);
ByteStreams.copy(spatialWriter.combineStreams(), spatialIndexFile);
spatialIoPeon.close();
}
}

View File

@ -34,8 +34,9 @@ import java.util.Map;
*/
public class TmpFileIOPeon implements IOPeon
{
private final File dir;
private final boolean allowOverwrite;
Map<String, File> createdFiles = Maps.newLinkedHashMap();
private final Map<String, File> createdFiles = Maps.newLinkedHashMap();
public TmpFileIOPeon()
{
@ -44,6 +45,12 @@ public class TmpFileIOPeon implements IOPeon
public TmpFileIOPeon(boolean allowOverwrite)
{
this(null, allowOverwrite);
}
public TmpFileIOPeon(File dir, boolean allowOverwrite)
{
this.dir = dir;
this.allowOverwrite = allowOverwrite;
}
@ -52,7 +59,7 @@ public class TmpFileIOPeon implements IOPeon
{
File retFile = createdFiles.get(filename);
if (retFile == null) {
retFile = File.createTempFile("filePeon", filename);
retFile = File.createTempFile("filePeon", filename, dir);
createdFiles.put(filename, retFile);
return new BufferedOutputStream(new FileOutputStream(retFile));
} else if (allowOverwrite) {

View File

@ -106,7 +106,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
// moving the temporary directory to the final destination, once success the potentially concurrent push operations
// will be failed and will read the descriptor.json created by current push operation directly
createDirectoryIfNotExists(outDir.getParentFile());
FileUtils.forceMkdir(outDir.getParentFile());
try {
java.nio.file.Files.move(tmpOutDir.toPath(), outDir.toPath());
}
@ -118,13 +118,6 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
return dataSegment;
}
private void createDirectoryIfNotExists(File directory) throws IOException
{
if (!directory.mkdirs() && !directory.isDirectory()) {
throw new IOException(String.format("Cannot create directory[%s]", directory.toString()));
}
}
private String intermediateDirFor(String storageDir)
{
return "intermediate_pushes/" + storageDir + "." + UUID.randomUUID().toString();
@ -132,7 +125,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
private long compressSegment(File dataSegmentFile, File outDir) throws IOException
{
createDirectoryIfNotExists(outDir);
FileUtils.forceMkdir(outDir);
File outFile = new File(outDir, "index.zip");
log.info("Compressing files from[%s] to [%s]", dataSegmentFile, outFile);
return CompressionUtils.zip(dataSegmentFile, outFile);

View File

@ -961,9 +961,7 @@ public class AppenderatorImpl implements Appenderator
private File createPersistDirIfNeeded(SegmentIdentifier identifier) throws IOException
{
final File persistDir = computePersistDir(identifier);
if (!persistDir.mkdir() && !persistDir.exists()) {
throw new IOException(String.format("Could not create directory: %s", persistDir));
}
FileUtils.forceMkdir(persistDir);
objectMapper.writeValue(computeIdentifierFile(identifier), identifier);

View File

@ -119,7 +119,7 @@ public class LocalDataSegmentPusherTest
public void testPushCannotCreateDirectory() throws IOException
{
exception.expect(IOException.class);
exception.expectMessage("Cannot create directory");
exception.expectMessage("Unable to create directory");
config.storageDirectory = new File(config.storageDirectory, "xxx");
Assert.assertTrue(config.storageDirectory.mkdir());
config.storageDirectory.setWritable(false);

View File

@ -219,19 +219,18 @@ public class PullDependencies implements Runnable
final File extensionsDir = new File(extensionsConfig.getDirectory());
final File hadoopDependenciesDir = new File(extensionsConfig.getHadoopDependenciesDir());
if (clean) {
try {
try {
if (clean) {
FileUtils.deleteDirectory(extensionsDir);
FileUtils.deleteDirectory(hadoopDependenciesDir);
}
catch (IOException e) {
log.error("Unable to clear extension directory at [%s]", extensionsConfig.getDirectory());
throw Throwables.propagate(e);
}
FileUtils.forceMkdir(extensionsDir);
FileUtils.forceMkdir(hadoopDependenciesDir);
}
catch (IOException e) {
log.error(e, "Unable to clear or create extension directory at [%s]", extensionsDir);
throw Throwables.propagate(e);
}
createRootExtensionsDirectory(extensionsDir);
createRootExtensionsDirectory(hadoopDependenciesDir);
log.info(
"Start pull-deps with local repository [%s] and remote repositories [%s]",
@ -461,22 +460,6 @@ public class PullDependencies implements Runnable
}
}
private void createRootExtensionsDirectory(File atLocation)
{
if (atLocation.isDirectory()) {
log.info("Root extension directory [%s] already exists, skip creating", atLocation.getAbsolutePath());
return;
}
if (!atLocation.mkdirs()) {
throw new ISE(
String.format(
"Unable to create extensions directory at [%s]",
atLocation.getAbsolutePath()
)
);
}
}
/**
* Create the extension directory for a specific maven coordinate.
* The name of this directory should be the artifactId in the coordinate

View File

@ -22,9 +22,7 @@ package io.druid.cli;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.guice.ExtensionsConfig;
import io.druid.java.util.common.ISE;
import io.tesla.aether.internal.DefaultTeslaAether;
import org.eclipse.aether.artifact.Artifact;
import org.eclipse.aether.artifact.DefaultArtifact;
@ -168,9 +166,9 @@ public class PullDependenciesTest
}
/**
* A file exists on the root extension directory path, but it's not a directory, throw ISE.
* A file exists on the root extension directory path, but it's not a directory, throw exception.
*/
@Test(expected = ISE.class)
@Test(expected = RuntimeException.class)
public void testPullDependencies_root_extension_dir_bad_state() throws IOException
{
Assert.assertTrue(rootExtensionsDir.createNewFile());
@ -188,9 +186,9 @@ public class PullDependenciesTest
}
/**
* A file exists on the root hadoop dependencies directory path, but it's not a directory, throw ISE.
* A file exists on the root hadoop dependencies directory path, but it's not a directory, throw exception.
*/
@Test(expected = ISE.class)
@Test(expected = RuntimeException.class)
public void testPullDependencies_root_hadoop_dependencies_dir_bad_state() throws IOException
{
Assert.assertTrue(rootHadoopDependenciesDir.createNewFile());