diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index df54c329d3e..2451729caa1 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -20,9 +20,12 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.client.DataSegment; @@ -36,6 +39,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; @@ -107,7 +111,13 @@ public class ZkCoordinator implements DataSegmentChangeHandler curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); if (config.isLoadFromSegmentCacheEnabled()) { - loadCache(); + try { + loadCache(); + } + catch (Exception e) { + log.makeAlert(e, "Exception loading from cache") + .emit(); + } } loadQueueCache.getListenable().addListener( @@ -254,7 +264,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler announcer.announceSegment(segment); } catch (IOException e) { - removeSegment(segment); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } @@ -269,38 +278,61 @@ public class ZkCoordinator implements DataSegmentChangeHandler public void addSegments(Iterable segments) { try { - for (DataSegment segment : segments) { - log.info("Loading segment %s", segment.getIdentifier()); + final List segmentFailures = Lists.newArrayList(); + Iterable validSegments = FunctionalIterable + .create(segments) + .transform( + new Function() + { + @Nullable + @Override + public DataSegment apply(@Nullable DataSegment segment) + { + if (segment == null) { + return null; + } - try { - serverManager.loadSegment(segment); - } - catch (Exception e) { - removeSegment(segment); - throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); - } + log.info("Loading segment %s", segment.getIdentifier()); - File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - removeSegment(segment); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); - } - } - } + try { + serverManager.loadSegment(segment); + } + catch (Exception e) { + log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + return null; + } + + File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + return null; + } + } + + return segment; + } + } + ) + .filter(Predicates.notNull()); try { - announcer.announceSegments(segments); + announcer.announceSegments(validSegments); } catch (IOException e) { - removeSegments(segments); throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments); } + + if (!segmentFailures.isEmpty()) { + throw new SegmentLoadingException("Error loading segments: %s", segmentFailures); + } } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segments for dataSource")