diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index df54c329d3e..39799c812ee 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -107,7 +107,13 @@ public class ZkCoordinator implements DataSegmentChangeHandler curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); if (config.isLoadFromSegmentCacheEnabled()) { - loadCache(); + try { + loadCache(); + } + catch (Exception e) { + log.makeAlert(e, "Exception loading from cache") + .emit(); + } } loadQueueCache.getListenable().addListener( @@ -254,7 +260,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler announcer.announceSegment(segment); } catch (IOException e) { - removeSegment(segment); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } @@ -269,6 +274,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler public void addSegments(Iterable segments) { try { + final List segmentFailures = Lists.newArrayList(); + final List validSegments = Lists.newArrayList(); + for (DataSegment segment : segments) { log.info("Loading segment %s", segment.getIdentifier()); @@ -276,8 +284,10 @@ public class ZkCoordinator implements DataSegmentChangeHandler serverManager.loadSegment(segment); } catch (Exception e) { + log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); removeSegment(segment); - throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); + segmentFailures.add(segment.getIdentifier()); + continue; } File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); @@ -286,21 +296,29 @@ public class ZkCoordinator implements DataSegmentChangeHandler jsonMapper.writeValue(segmentInfoCacheFile, segment); } catch (IOException e) { + log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile); removeSegment(segment); - throw new SegmentLoadingException( - e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile - ); + segmentFailures.add(segment.getIdentifier()); + continue; } } + + validSegments.add(segment); } try { - announcer.announceSegments(segments); + announcer.announceSegments(validSegments); } catch (IOException e) { - removeSegments(segments); throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments); } + + if (!segmentFailures.isEmpty()) { + for (String segmentFailure : segmentFailures) { + log.error("%s failed to load", segmentFailure); + } + throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size()); + } } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segments for dataSource")