Merge branch 'master' into bug-fix

This commit is contained in:
fjy 2013-07-02 15:59:52 -07:00
commit 9818d28a26
17 changed files with 193 additions and 139 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -24,6 +24,10 @@ import com.google.common.collect.Maps;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
@ -31,10 +35,6 @@ import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -141,9 +141,9 @@ public class TaskToolbox
new SegmentLoaderConfig()
{
@Override
public File getCacheDirectory()
public String getCacheDirectory()
{
return new File(getTaskWorkDir(), "fetched_segments");
return new File(getTaskWorkDir(), "fetched_segments").toString();
}
}
);

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -662,6 +662,14 @@ public class RealtimePlumberSchool implements PlumberSchool
*/
private int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval)
{
if (indexToPersist.hasSwapped()) {
log.info(
"DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
schema.getDataSource(), interval, indexToPersist
);
return 0;
}
log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist);
try {
int numRows = indexToPersist.getIndex().size();

View File

@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -261,7 +261,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
announcer.unannounceSegment(segment);
}
catch (Exception e) {
log.makeAlert("Failed to remove segment")
log.makeAlert(e, "Failed to remove segment")
.addData("segment", segment)
.emit();
}

View File

@ -39,9 +39,13 @@ public interface DataSegmentPuller
/**
* Returns the last modified time of the given segment.
*
* Note, this is not actually used at this point and doesn't need to actually be implemented. It's just still here
* to not break compatibility.
*
* @param segment The segment to check the last modified time for
* @return the last modified time in millis from the epoch
* @throws SegmentLoadingException if there are any errors
*/
@Deprecated
public long getLastModified(DataSegment segment) throws SegmentLoadingException;
}

View File

@ -20,32 +20,14 @@
package com.metamx.druid.loading;
import com.google.common.base.Joiner;
import com.metamx.common.MapUtils;
import com.metamx.druid.client.DataSegment;
import java.util.Map;
/**
*/
public class DataSegmentPusherUtil
{
private static final Joiner JOINER = Joiner.on("/").skipNulls();
public static String getLegacyStorageDir(DataSegment segment)
{
final Map<String,Object> loadSpec = segment.getLoadSpec();
String specType = MapUtils.getString(loadSpec, "type");
if (specType.startsWith("s3")) {
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
return String.format("%s/%s", s3Bucket, s3Path.substring(0, s3Path.lastIndexOf("/")));
}
return null;
}
public static String getStorageDir(DataSegment segment)
{
return JOINER.join(

View File

@ -21,14 +21,18 @@ package com.metamx.druid.loading;
import org.skife.config.Config;
import java.io.File;
/**
*/
public abstract class SegmentLoaderConfig
{
@Config({"druid.paths.indexCache", "druid.segmentCache.path"})
public abstract File getCacheDirectory();
public abstract String getCacheDirectory();
@Config("druid.server.maxSize")
public long getServerMaxSize()
{
return Long.MAX_VALUE;
}
@Config("druid.segmentCache.deleteOnRemove")
public boolean deleteOnRemove()

View File

@ -19,9 +19,13 @@
package com.metamx.druid.loading;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import com.metamx.common.StreamUtils;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
@ -29,7 +33,11 @@ import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import org.apache.commons.io.FileUtils;
import java.io.*;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
*/
@ -39,8 +47,8 @@ public class SingleSegmentLoader implements SegmentLoader
private final DataSegmentPuller dataSegmentPuller;
private final QueryableIndexFactory factory;
private final SegmentLoaderConfig config;
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final List<StorageLocation> locations;
@Inject
public SingleSegmentLoader(
@ -51,22 +59,52 @@ public class SingleSegmentLoader implements SegmentLoader
{
this.dataSegmentPuller = dataSegmentPuller;
this.factory = factory;
this.config = config;
final ImmutableList.Builder<StorageLocation> locBuilder = ImmutableList.builder();
// This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone
// We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that
// But, that'll have to wait for some other day.
for (String dirSpec : config.getCacheDirectory().split(",")) {
String[] dirSplit = dirSpec.split("\\|");
if (dirSplit.length == 1) {
locBuilder.add(new StorageLocation(new File(dirSplit[0]), config.getServerMaxSize()));
}
else if (dirSplit.length == 2) {
final Long maxSize = Longs.tryParse(dirSplit[1]);
if (maxSize == null) {
throw new IAE("Size of a local segment storage location must be an integral number, got[%s]", dirSplit[1]);
}
locBuilder.add(new StorageLocation(new File(dirSplit[0]), maxSize));
}
else {
throw new ISE(
"Unknown segment storage location[%s]=>[%s], config[%s].",
dirSplit.length, dirSpec, config.getCacheDirectory()
);
}
}
locations = locBuilder.build();
Preconditions.checkArgument(locations.size() > 0, "Must have at least one segment cache directory.");
log.info("Using storage locations[%s]", locations);
}
@Override
public boolean isSegmentLoaded(final DataSegment segment)
{
File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
return true;
}
return findStorageLocationIfLoaded(segment) != null;
}
final File legacyStorageDir = new File(
config.getCacheDirectory(),
DataSegmentPusherUtil.getLegacyStorageDir(segment)
);
return legacyStorageDir.exists();
public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
return location;
}
}
return null;
}
@Override
@ -80,111 +118,129 @@ public class SingleSegmentLoader implements SegmentLoader
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
StorageLocation loc = findStorageLocationIfLoaded(segment);
final String legacyDir = DataSegmentPusherUtil.getLegacyStorageDir(segment);
if (legacyDir != null) {
File legacyStorageDir = new File(config.getCacheDirectory(), legacyDir);
final File retVal;
if (legacyStorageDir.exists()) {
log.info("Found legacyStorageDir[%s], moving to new storage location[%s]", legacyStorageDir, localStorageDir);
if (localStorageDir.exists()) {
try {
FileUtils.deleteDirectory(localStorageDir);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Error deleting localDir[%s]", localStorageDir);
}
}
final File parentDir = localStorageDir.getParentFile();
if (!parentDir.exists()) {
log.info("Parent[%s] didn't exist, creating.", parentDir);
if (!parentDir.mkdirs()) {
log.warn("Unable to make parentDir[%s]", parentDir);
}
}
if (!legacyStorageDir.renameTo(localStorageDir)) {
log.warn("Failed moving [%s] to [%s]", legacyStorageDir, localStorageDir);
}
if (loc == null) {
Iterator<StorageLocation> locIter = locations.iterator();
loc = locIter.next();
while (locIter.hasNext()) {
loc = loc.mostEmpty(locIter.next());
}
}
if (localStorageDir.exists()) {
long localLastModified = localStorageDir.lastModified();
long remoteLastModified = dataSegmentPuller.getLastModified(segment);
if (remoteLastModified > 0 && localLastModified >= remoteLastModified) {
log.info(
"Found localStorageDir[%s] with modified[%s], which is same or after remote[%s]. Using.",
localStorageDir, localLastModified, remoteLastModified
);
return localStorageDir;
}
}
if (localStorageDir.exists()) {
try {
FileUtils.deleteDirectory(localStorageDir);
}
catch (IOException e) {
log.warn(e, "Exception deleting previously existing local dir[%s]", localStorageDir);
}
}
if (!localStorageDir.mkdirs()) {
log.info("Unable to make parent file[%s]", localStorageDir);
}
dataSegmentPuller.getSegmentFiles(segment, localStorageDir);
return localStorageDir;
}
private File getLocalStorageDir(DataSegment segment)
{
String outputKey = JOINER.join(
segment.getDataSource(),
String.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
return new File(config.getCacheDirectory(), outputKey);
}
private void moveToCache(File pulledFile, File cacheFile) throws SegmentLoadingException
{
log.info("Rename pulledFile[%s] to cacheFile[%s]", pulledFile, cacheFile);
if (!pulledFile.renameTo(cacheFile)) {
log.warn("Error renaming pulledFile[%s] to cacheFile[%s]. Copying instead.", pulledFile, cacheFile);
try {
StreamUtils.copyToFileAndClose(new FileInputStream(pulledFile), cacheFile);
}
catch (IOException e) {
throw new SegmentLoadingException(
e,
"Problem moving pulledFile[%s] to cache[%s]",
pulledFile,
cacheFile
if (!loc.canHandle(segment.getSize())) {
throw new ISE(
"Segment[%s:%,d] too large for storage[%s:%,d].",
segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available()
);
}
if (!pulledFile.delete()) {
log.error("Could not delete pulledFile[%s].", pulledFile);
File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (!storageDir.mkdirs()) {
log.debug("Unable to make parent file[%s]", storageDir);
}
dataSegmentPuller.getSegmentFiles(segment, storageDir);
loc.addSegment(segment);
retVal = storageDir;
}
else {
retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
}
loc.addSegment(segment);
return retVal;
}
@Override
public void cleanup(DataSegment segment) throws SegmentLoadingException
{
File cacheFile = getLocalStorageDir(segment);
StorageLocation loc = findStorageLocationIfLoaded(segment);
if (loc == null) {
log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
return;
}
try {
File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
log.info("Deleting directory[%s]", cacheFile);
FileUtils.deleteDirectory(cacheFile);
loc.removeSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}
private static class StorageLocation
{
private final File path;
private final long maxSize;
private final Set<DataSegment> segments;
private volatile long currSize = 0;
StorageLocation(
File path,
long maxSize
)
{
this.path = path;
this.maxSize = maxSize;
this.segments = Sets.newHashSet();
}
private File getPath()
{
return path;
}
private Long getMaxSize()
{
return maxSize;
}
private synchronized void addSegment(DataSegment segment)
{
if (! segments.add(segment)) {
currSize += segment.getSize();
}
}
private synchronized void removeSegment(DataSegment segment)
{
if (segments.remove(segment)) {
currSize -= segment.getSize();
}
}
private boolean canHandle(long size)
{
return available() > size;
}
private synchronized long available()
{
return maxSize - currSize;
}
private StorageLocation mostEmpty(StorageLocation other)
{
return available() > other.available() ? this : other;
}
@Override
public String toString()
{
return "StorageLocation{" +
"path=" + path +
", maxSize=" + maxSize +
'}';
}
}
}

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.5-SNAPSHOT</version>
<version>0.5.6-SNAPSHOT</version>
</parent>
<dependencies>