Merge pull request #614 from metamx/delay-drop

add an optional delay for how long it takes to drop a segment, fixes all...
This commit is contained in:
xvrl 2014-06-19 22:38:26 -07:00
commit 12ee17c4d2
5 changed files with 49 additions and 9 deletions

View File

@ -154,6 +154,7 @@ Druid storage nodes maintain information about segments they have already downlo
|--------|-----------|-------| |--------|-----------|-------|
|`druid.segmentCache.locations`|Segments assigned to a Historical node are first stored on the local file system (in a disk cache) and then served by the Historical node. These locations define where that local cache resides. | none (no caching) | |`druid.segmentCache.locations`|Segments assigned to a Historical node are first stored on the local file system (in a disk cache) and then served by the Historical node. These locations define where that local cache resides. | none (no caching) |
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true| |`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true|
|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|5 minutes|
|`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir| |`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
### Jetty Server Module ### Jetty Server Module

View File

@ -228,7 +228,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final QueryableDruidServer queryableDruidServer = segment.lhs.pick(); final QueryableDruidServer queryableDruidServer = segment.lhs.pick();
if (queryableDruidServer == null) { if (queryableDruidServer == null) {
log.error("No servers found for %s?! How can this be?!", segment.rhs); log.makeAlert("No servers found for %s?! How can this be?!", segment.rhs).emit();
} else { } else {
final DruidServer server = queryableDruidServer.getServer(); final DruidServer server = queryableDruidServer.getServer();
List<SegmentDescriptor> descriptors = serverSegments.get(server); List<SegmentDescriptor> descriptors = serverSegments.get(server);

View File

@ -37,6 +37,9 @@ public class SegmentLoaderConfig
@JsonProperty("deleteOnRemove") @JsonProperty("deleteOnRemove")
private boolean deleteOnRemove = true; private boolean deleteOnRemove = true;
@JsonProperty("dropSegmentDelayMillis")
private int dropSegmentDelayMillis = 5 * 60 * 1000; // 5 mins
@JsonProperty @JsonProperty
private File infoDir = null; private File infoDir = null;
@ -50,6 +53,11 @@ public class SegmentLoaderConfig
return deleteOnRemove; return deleteOnRemove;
} }
public int getDropSegmentDelayMillis()
{
return dropSegmentDelayMillis;
}
public File getInfoDir() public File getInfoDir()
{ {
if (infoDir == null) { if (infoDir == null) {
@ -72,9 +80,10 @@ public class SegmentLoaderConfig
public String toString() public String toString()
{ {
return "SegmentLoaderConfig{" + return "SegmentLoaderConfig{" +
"locations=" + getLocations() + "locations=" + locations +
", deleteOnRemove=" + isDeleteOnRemove() + ", deleteOnRemove=" + deleteOnRemove +
", infoDir=" + getInfoDir() + ", dropSegmentDelayMillis=" + dropSegmentDelayMillis +
", infoDir=" + infoDir +
'}'; '}';
} }
} }

View File

@ -20,8 +20,10 @@
package io.druid.server.coordination; package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
@ -32,6 +34,8 @@ import org.apache.curator.framework.CuratorFramework;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/** /**
*/ */
@ -43,6 +47,7 @@ public class ZkCoordinator extends BaseZkCoordinator
private final SegmentLoaderConfig config; private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer announcer; private final DataSegmentAnnouncer announcer;
private final ServerManager serverManager; private final ServerManager serverManager;
private final ScheduledExecutorService exec;
@Inject @Inject
public ZkCoordinator( public ZkCoordinator(
@ -52,7 +57,8 @@ public class ZkCoordinator extends BaseZkCoordinator
DruidServerMetadata me, DruidServerMetadata me,
DataSegmentAnnouncer announcer, DataSegmentAnnouncer announcer,
CuratorFramework curator, CuratorFramework curator,
ServerManager serverManager ServerManager serverManager,
ScheduledExecutorFactory factory
) )
{ {
super(jsonMapper, zkPaths, me, curator); super(jsonMapper, zkPaths, me, curator);
@ -61,6 +67,8 @@ public class ZkCoordinator extends BaseZkCoordinator
this.config = config; this.config = config;
this.announcer = announcer; this.announcer = announcer;
this.serverManager = serverManager; this.serverManager = serverManager;
this.exec = factory.create(1, "ZkCoordinator-Exec--%d");
} }
@Override @Override
@ -225,17 +233,36 @@ public class ZkCoordinator extends BaseZkCoordinator
@Override @Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) public void removeSegment(final DataSegment segment, final DataSegmentChangeCallback callback)
{ {
try { try {
serverManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) { if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
} }
announcer.unannounceSegment(segment); announcer.unannounceSegment(segment);
log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis());
exec.schedule(
new Runnable()
{
@Override
public void run()
{
try {
serverManager.dropSegment(segment);
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
.addData("segment", segment)
.emit();
}
}
},
config.getDropSegmentDelayMillis(),
TimeUnit.MILLISECONDS
);
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Failed to remove segment") log.makeAlert(e, "Failed to remove segment")

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.LocalCacheProvider; import io.druid.client.cache.LocalCacheProvider;
@ -117,7 +119,8 @@ public class ZkCoordinatorTest extends CuratorTestBase
me, me,
announcer, announcer,
curator, curator,
serverManager serverManager,
ScheduledExecutors.createFactory(new Lifecycle())
); );
} }