Ignore misnamed segment cache info files. (#4245)

* Ignore misnamed segment cache info files.

Fixes a bug where historical nodes could announce the same segment twice,
ultimately leading to historicals and their watchers (like coordinators
and brokers) being out of sync about which segments are served.

This could be caused if Druid is switched from local time to UTC, because
that causes the same segment descriptors to lead to different identifiers
(an identifier with local time interval before the switch, and UTC interval
after the switch). In turn this causes that segment descriptor to be written
to multiple segment cache info files and potentially get announced twice.

Later, if the historical receives a drop request, it drops the segment and
unannounces it once, but the other announcement would stick around in an
ephemeral znode forever, confusing coordinators and brokers.

* Only alert once.
This commit is contained in:
Gian Merlino 2017-05-04 13:02:37 +09:00 committed by David Lim
parent 4502c207af
commit d0f89e969a
1 changed files with 13 additions and 2 deletions

View File

@ -252,12 +252,17 @@ public class ZkCoordinator implements DataSegmentChangeHandler
List<DataSegment> cachedSegments = Lists.newArrayList(); List<DataSegment> cachedSegments = Lists.newArrayList();
File[] segmentsToLoad = baseDir.listFiles(); File[] segmentsToLoad = baseDir.listFiles();
int ignored = 0;
for (int i = 0; i < segmentsToLoad.length; i++) { for (int i = 0; i < segmentsToLoad.length; i++) {
File file = segmentsToLoad[i]; File file = segmentsToLoad[i];
log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file); log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file);
try { try {
DataSegment segment = jsonMapper.readValue(file, DataSegment.class); final DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (serverManager.isSegmentCached(segment)) {
if (!segment.getIdentifier().equals(file.getName())) {
log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getIdentifier());
ignored++;
} else if (serverManager.isSegmentCached(segment)) {
cachedSegments.add(segment); cachedSegments.add(segment);
} else { } else {
log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier());
@ -275,6 +280,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler
} }
} }
if (ignored > 0) {
log.makeAlert("Ignored misnamed segment cache files on startup.")
.addData("numIgnored", ignored)
.emit();
}
addSegments( addSegments(
cachedSegments, cachedSegments,
new DataSegmentChangeCallback() new DataSegmentChangeCallback()