From 0c360c05c2619b3351e8a0dfe00d2786ec9acfef Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 12 Aug 2013 12:56:59 -0700 Subject: [PATCH 1/4] better handling of exceptions during startup --- .../druid/coordination/ZkCoordinator.java | 84 +++++++++++++------ 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index df54c329d3e..2451729caa1 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -20,9 +20,12 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.client.DataSegment; @@ -36,6 +39,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; @@ -107,7 +111,13 @@ public class ZkCoordinator implements DataSegmentChangeHandler curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); if (config.isLoadFromSegmentCacheEnabled()) { - loadCache(); + try { + loadCache(); + } + catch (Exception e) { + log.makeAlert(e, "Exception loading from cache") + .emit(); + } } loadQueueCache.getListenable().addListener( @@ -254,7 +264,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler announcer.announceSegment(segment); } catch (IOException e) { - removeSegment(segment); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } @@ -269,38 +278,61 @@ public class ZkCoordinator implements DataSegmentChangeHandler public void addSegments(Iterable segments) { try { - for (DataSegment segment : segments) { - log.info("Loading segment %s", segment.getIdentifier()); + final List segmentFailures = Lists.newArrayList(); + Iterable validSegments = FunctionalIterable + .create(segments) + .transform( + new Function() + { + @Nullable + @Override + public DataSegment apply(@Nullable DataSegment segment) + { + if (segment == null) { + return null; + } - try { - serverManager.loadSegment(segment); - } - catch (Exception e) { - removeSegment(segment); - throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); - } + log.info("Loading segment %s", segment.getIdentifier()); - File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - removeSegment(segment); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); - } - } - } + try { + serverManager.loadSegment(segment); + } + catch (Exception e) { + log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + return null; + } + + File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + return null; + } + } + + return segment; + } + } + ) + .filter(Predicates.notNull()); try { - announcer.announceSegments(segments); + announcer.announceSegments(validSegments); } catch (IOException e) { - removeSegments(segments); throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments); } + + if (!segmentFailures.isEmpty()) { + throw new SegmentLoadingException("Error loading segments: %s", segmentFailures); + } } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segments for dataSource") From 2ec2957207874e7c3f0ff2e2050794189268ff89 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 12 Aug 2013 13:04:48 -0700 Subject: [PATCH 2/4] clean up the error reporting code according to code review --- .../druid/coordination/ZkCoordinator.java | 70 +++++++------------ 1 file changed, 26 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 2451729caa1..afbfe10ada7 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -20,12 +20,9 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.client.DataSegment; @@ -39,7 +36,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; @@ -279,49 +275,34 @@ public class ZkCoordinator implements DataSegmentChangeHandler { try { final List segmentFailures = Lists.newArrayList(); - Iterable validSegments = FunctionalIterable - .create(segments) - .transform( - new Function() - { - @Nullable - @Override - public DataSegment apply(@Nullable DataSegment segment) - { - if (segment == null) { - return null; - } + final List validSegments = Lists.newArrayList(); - log.info("Loading segment %s", segment.getIdentifier()); + for (DataSegment segment : segments) { + log.info("Loading segment %s", segment.getIdentifier()); - try { - serverManager.loadSegment(segment); - } - catch (Exception e) { - log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); - removeSegment(segment); - segmentFailures.add(segment.getIdentifier()); - return null; - } + try { + serverManager.loadSegment(segment); + } + catch (Exception e) { + log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + } - File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); - if (!segmentInfoCacheFile.exists()) { - try { - jsonMapper.writeValue(segmentInfoCacheFile, segment); - } - catch (IOException e) { - log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); - removeSegment(segment); - segmentFailures.add(segment.getIdentifier()); - return null; - } - } + File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + if (!segmentInfoCacheFile.exists()) { + try { + jsonMapper.writeValue(segmentInfoCacheFile, segment); + } + catch (IOException e) { + log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); + removeSegment(segment); + segmentFailures.add(segment.getIdentifier()); + } + } - return segment; - } - } - ) - .filter(Predicates.notNull()); + validSegments.add(segment); + } try { announcer.announceSegments(validSegments); @@ -331,7 +312,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler } if (!segmentFailures.isEmpty()) { - throw new SegmentLoadingException("Error loading segments: %s", segmentFailures); + log.error("Exception loading segments: %s", segmentFailures); + throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size()); } } catch (SegmentLoadingException e) { From dd20950f8ae2342455e6e0a6fd869ffa21ea2153 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 12 Aug 2013 13:07:29 -0700 Subject: [PATCH 3/4] adding missing continue --- .../main/java/com/metamx/druid/coordination/ZkCoordinator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index afbfe10ada7..b444e1d685b 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -287,6 +287,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); removeSegment(segment); segmentFailures.add(segment.getIdentifier()); + continue; } File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); @@ -298,6 +299,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); removeSegment(segment); segmentFailures.add(segment.getIdentifier()); + continue; } } From a39eb65edb7e04ba8bba43cedb5d0ac87d9d865a Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 12 Aug 2013 13:15:35 -0700 Subject: [PATCH 4/4] change summary of segment failures to one per line --- .../java/com/metamx/druid/coordination/ZkCoordinator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index b444e1d685b..39799c812ee 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -314,7 +314,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler } if (!segmentFailures.isEmpty()) { - log.error("Exception loading segments: %s", segmentFailures); + for (String segmentFailure : segmentFailures) { + log.error("%s failed to load", segmentFailure); + } throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size()); } }