Merge pull request #329 from metamx/segment-load-idempotent

ZkCoordinator: Make addSegment, addSegments idempotent
This commit is contained in:
fjy 2013-12-16 11:59:22 -08:00
commit 09f73e2bf2
2 changed files with 44 additions and 31 deletions

View File

@ -116,7 +116,13 @@ public class ServerManager implements QuerySegmentWalker
return segmentLoader.isSegmentLoaded(segment); return segmentLoader.isSegmentLoaded(segment);
} }
public void loadSegment(final DataSegment segment) throws SegmentLoadingException /**
* Load a single segment.
* @param segment segment to load
* @return true if the segment was newly loaded, false if it was already loaded
* @throws SegmentLoadingException if the segment cannot be loaded
*/
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
{ {
final Segment adapter; final Segment adapter;
try { try {
@ -150,8 +156,8 @@ public class ServerManager implements QuerySegmentWalker
segment.getVersion() segment.getVersion()
); );
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
log.info("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier());
throw new SegmentLoadingException("Segment already exists[%s]", segment.getIdentifier()); return false;
} }
loadedIntervals.add( loadedIntervals.add(
@ -165,6 +171,7 @@ public class ServerManager implements QuerySegmentWalker
synchronized (dataSourceCounts) { synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, 1L); dataSourceCounts.add(dataSource, 1L);
} }
return true;
} }
} }

View File

@ -230,34 +230,37 @@ public class ZkCoordinator implements DataSegmentChangeHandler
try { try {
log.info("Loading segment %s", segment.getIdentifier()); log.info("Loading segment %s", segment.getIdentifier());
final boolean loaded;
try { try {
serverManager.loadSegment(segment); loaded = serverManager.loadSegment(segment);
} }
catch (Exception e) { catch (Exception e) {
removeSegment(segment); removeSegment(segment);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier()); throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
} }
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (loaded) {
if (!segmentInfoCacheFile.exists()) { File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
}
try { try {
jsonMapper.writeValue(segmentInfoCacheFile, segment); announcer.announceSegment(segment);
} }
catch (IOException e) { catch (IOException e) {
removeSegment(segment); throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
} }
} }
try {
announcer.announceSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
} }
catch (SegmentLoadingException e) { catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource") log.makeAlert(e, "Failed to load segment for dataSource")
@ -275,8 +278,9 @@ public class ZkCoordinator implements DataSegmentChangeHandler
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
log.info("Loading segment %s", segment.getIdentifier()); log.info("Loading segment %s", segment.getIdentifier());
final boolean loaded;
try { try {
serverManager.loadSegment(segment); loaded = serverManager.loadSegment(segment);
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Exception loading segment[%s]", segment.getIdentifier()); log.error(e, "Exception loading segment[%s]", segment.getIdentifier());
@ -285,20 +289,22 @@ public class ZkCoordinator implements DataSegmentChangeHandler
continue; continue;
} }
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (loaded) {
if (!segmentInfoCacheFile.exists()) { File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
try { if (!segmentInfoCacheFile.exists()) {
jsonMapper.writeValue(segmentInfoCacheFile, segment); try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
removeSegment(segment);
segmentFailures.add(segment.getIdentifier());
continue;
}
} }
catch (IOException e) {
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
removeSegment(segment);
segmentFailures.add(segment.getIdentifier());
continue;
}
}
validSegments.add(segment); validSegments.add(segment);
}
} }
try { try {