diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 2451729caa1..afbfe10ada7 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -20,12 +20,9 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.client.DataSegment; @@ -39,7 +36,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; @@ -279,49 +275,34 @@ public class ZkCoordinator implements DataSegmentChangeHandler { try { final List segmentFailures = Lists.newArrayList(); - Iterable validSegments = FunctionalIterable - .create(segments) - .transform( - new Function() - { - @Nullable - @Override - public DataSegment apply(@Nullable DataSegment segment) - { - if (segment == null) { - return null; - } + final List validSegments = Lists.newArrayList(); - log.info("Loading segment %s", segment.getIdentifier()); + for (DataSegment segment : segments) { + log.info("Loading segment %s", segment.getIdentifier()); - try { - serverManager.loadSegment(segment); - } - catch (Exception e) { - log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); - removeSegment(segment); - segmentFailures.add(segment.getIdentifier()); - return null; - } + try { + serverManager.loadSegment(segment); + } + catch (Exception e) { + log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + } - File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); - removeSegment(segment); - segmentFailures.add(segment.getIdentifier()); - return null; - } - } + File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + } + } - return segment; - } - } - ) - .filter(Predicates.notNull()); + validSegments.add(segment); + } try { announcer.announceSegments(validSegments); @@ -331,7 +312,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler } if (!segmentFailures.isEmpty()) { - throw new SegmentLoadingException("Error loading segments: %s", segmentFailures); + log.error("Exception loading segments: %s", segmentFailures); + throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size()); } } catch (SegmentLoadingException e) {