Merge pull request #214 from metamx/startup-fix

Fix Zk Coordinator failing startup due to server manager exceptions and cleanup example configs
This commit is contained in:
cheddar 2013-08-12 13:17:58 -07:00
commit de5e746d48
1 changed files with 26 additions and 8 deletions

View File

@ -107,8 +107,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
if (config.isLoadFromSegmentCacheEnabled()) {
try {
loadCache();
}
catch (Exception e) {
log.makeAlert(e, "Exception loading from cache")
.emit();
}
}
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
@ -254,7 +260,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler
announcer.announceSegment(segment);
}
catch (IOException e) {
removeSegment(segment);
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
@ -269,6 +274,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler
public void addSegments(Iterable<DataSegment> segments)
{
try {
final List<String> segmentFailures = Lists.newArrayList();
final List<DataSegment> validSegments = Lists.newArrayList();
for (DataSegment segment : segments) {
log.info("Loading segment %s", segment.getIdentifier());
@ -276,8 +284,10 @@ public class ZkCoordinator implements DataSegmentChangeHandler
serverManager.loadSegment(segment);
}
catch (Exception e) {
log.error(e, "Exception loading segment[%s]", segment.getIdentifier());
removeSegment(segment);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
segmentFailures.add(segment.getIdentifier());
continue;
}
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
@ -286,21 +296,29 @@ public class ZkCoordinator implements DataSegmentChangeHandler
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
removeSegment(segment);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
segmentFailures.add(segment.getIdentifier());
continue;
}
}
validSegments.add(segment);
}
try {
announcer.announceSegments(segments);
announcer.announceSegments(validSegments);
}
catch (IOException e) {
removeSegments(segments);
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments);
}
if (!segmentFailures.isEmpty()) {
for (String segmentFailure : segmentFailures) {
log.error("%s failed to load", segmentFailure);
}
throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size());
}
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments for dataSource")