add an optional delay for how long it takes to drop a segment, fixes all manners of inconsistent issues

This commit is contained in:
fjy 2014-06-19 16:32:38 -07:00
parent 276e48e564
commit b313601a3b
4 changed files with 45 additions and 9 deletions

View File

@ -228,7 +228,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final QueryableDruidServer queryableDruidServer = segment.lhs.pick(); final QueryableDruidServer queryableDruidServer = segment.lhs.pick();
if (queryableDruidServer == null) { if (queryableDruidServer == null) {
log.error("No servers found for %s?! How can this be?!", segment.rhs); log.makeAlert("No servers found for %s?! How can this be?!", segment.rhs).emit();
} else { } else {
final DruidServer server = queryableDruidServer.getServer(); final DruidServer server = queryableDruidServer.getServer();
List<SegmentDescriptor> descriptors = serverSegments.get(server); List<SegmentDescriptor> descriptors = serverSegments.get(server);

View File

@ -37,6 +37,9 @@ public class SegmentLoaderConfig
@JsonProperty("deleteOnRemove") @JsonProperty("deleteOnRemove")
private boolean deleteOnRemove = true; private boolean deleteOnRemove = true;
@JsonProperty("dropSegmentDelayMillis")
private int dropSegmentDelayMillis = 0;
@JsonProperty @JsonProperty
private File infoDir = null; private File infoDir = null;
@ -50,6 +53,11 @@ public class SegmentLoaderConfig
return deleteOnRemove; return deleteOnRemove;
} }
public int getDropSegmentDelayMillis()
{
return dropSegmentDelayMillis;
}
public File getInfoDir() public File getInfoDir()
{ {
if (infoDir == null) { if (infoDir == null) {
@ -72,9 +80,10 @@ public class SegmentLoaderConfig
public String toString() public String toString()
{ {
return "SegmentLoaderConfig{" + return "SegmentLoaderConfig{" +
"locations=" + getLocations() + "locations=" + locations +
", deleteOnRemove=" + isDeleteOnRemove() + ", deleteOnRemove=" + deleteOnRemove +
", infoDir=" + getInfoDir() + ", dropSegmentDelayMillis=" + dropSegmentDelayMillis +
", infoDir=" + infoDir +
'}'; '}';
} }
} }

View File

@ -20,8 +20,10 @@
package io.druid.server.coordination; package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.SegmentLoadingException;
@ -32,6 +34,8 @@ import org.apache.curator.framework.CuratorFramework;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/** /**
*/ */
@ -43,6 +47,7 @@ public class ZkCoordinator extends BaseZkCoordinator
private final SegmentLoaderConfig config; private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer announcer; private final DataSegmentAnnouncer announcer;
private final ServerManager serverManager; private final ServerManager serverManager;
private final ScheduledExecutorService exec;
@Inject @Inject
public ZkCoordinator( public ZkCoordinator(
@ -52,7 +57,8 @@ public class ZkCoordinator extends BaseZkCoordinator
DruidServerMetadata me, DruidServerMetadata me,
DataSegmentAnnouncer announcer, DataSegmentAnnouncer announcer,
CuratorFramework curator, CuratorFramework curator,
ServerManager serverManager ServerManager serverManager,
ScheduledExecutorFactory factory
) )
{ {
super(jsonMapper, zkPaths, me, curator); super(jsonMapper, zkPaths, me, curator);
@ -61,6 +67,8 @@ public class ZkCoordinator extends BaseZkCoordinator
this.config = config; this.config = config;
this.announcer = announcer; this.announcer = announcer;
this.serverManager = serverManager; this.serverManager = serverManager;
this.exec = factory.create(1, "ZkCoordinator-Exec--%d");
} }
@Override @Override
@ -225,17 +233,33 @@ public class ZkCoordinator extends BaseZkCoordinator
@Override @Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) public void removeSegment(final DataSegment segment, DataSegmentChangeCallback callback)
{ {
try { try {
serverManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) { if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
} }
announcer.unannounceSegment(segment); announcer.unannounceSegment(segment);
exec.schedule(
new Runnable()
{
@Override
public void run()
{
try {
serverManager.dropSegment(segment);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
},
config.getDropSegmentDelayMillis(),
TimeUnit.MILLISECONDS
);
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Failed to remove segment") log.makeAlert(e, "Failed to remove segment")

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.LocalCacheProvider; import io.druid.client.cache.LocalCacheProvider;
@ -117,7 +119,8 @@ public class ZkCoordinatorTest extends CuratorTestBase
me, me,
announcer, announcer,
curator, curator,
serverManager serverManager,
ScheduledExecutors.createFactory(new Lifecycle())
); );
} }