add register abilities to mapper

This commit is contained in:
Fangjin Yang 2012-11-05 18:31:23 -08:00
parent 9fbee29eb4
commit 2ae0a15b5a
1 changed files with 34 additions and 15 deletions

View File

@ -150,7 +150,8 @@ public class IndexGeneratorJob implements Jobby
boolean success = job.waitForCompletion(true);
Counter invalidRowCount = job.getCounters().findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
Counter invalidRowCount = job.getCounters()
.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
jobStats.setInvalidRowCount(invalidRowCount.getValue());
return success;
@ -173,6 +174,10 @@ public class IndexGeneratorJob implements Jobby
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
parser = config.getDataSpec().getParser();
timestampConverter = ParserUtils.createTimestampParser(config.getTimestampFormat());
for (Registererer registererer : config.getRegistererers()) {
registererer.register();
}
}
@Override
@ -189,12 +194,11 @@ public class IndexGeneratorJob implements Jobby
try {
timestamp = timestampConverter.apply(tsStr);
}
catch(IllegalArgumentException e) {
if(config.isIgnoreInvalidRows()) {
catch (IllegalArgumentException e) {
if (config.isIgnoreInvalidRows()) {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row
}
else {
} else {
throw e;
}
}
@ -371,24 +375,28 @@ public class IndexGeneratorJob implements Jobby
if (toMerge.size() == 0) {
mergedBase = new File(baseFlushFile, "merged");
IndexMerger.persist(index, interval, mergedBase, new IndexMerger.ProgressIndicator()
IndexMerger.persist(
index, interval, mergedBase, new IndexMerger.ProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
});
}
);
} else {
final File finalFile = new File(baseFlushFile, "final");
IndexMerger.persist(index, interval, finalFile, new IndexMerger.ProgressIndicator()
IndexMerger.persist(
index, interval, finalFile, new IndexMerger.ProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
});
}
);
toMerge.add(finalFile);
for (File file : toMerge) {
@ -539,14 +547,24 @@ public class IndexGeneratorJob implements Jobby
if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime()
|| zipFile.getLen() != finalIndexZipFile.getLen()) {
log.info("File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]",
finalIndexZipFile.getPath(), new DateTime(finalIndexZipFile.getModificationTime()), finalIndexZipFile.getLen(),
zipFile.getPath(), new DateTime(zipFile.getModificationTime()), zipFile.getLen());
log.info(
"File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]",
finalIndexZipFile.getPath(),
new DateTime(finalIndexZipFile.getModificationTime()),
finalIndexZipFile.getLen(),
zipFile.getPath(),
new DateTime(zipFile.getModificationTime()),
zipFile.getLen()
);
outputFS.delete(finalIndexZipFilePath, false);
needRename = true;
} else {
log.info("File[%s / %s / %sB] existed and will be kept",
finalIndexZipFile.getPath(), new DateTime(finalIndexZipFile.getModificationTime()), finalIndexZipFile.getLen());
log.info(
"File[%s / %s / %sB] existed and will be kept",
finalIndexZipFile.getPath(),
new DateTime(finalIndexZipFile.getModificationTime()),
finalIndexZipFile.getLen()
);
needRename = false;
}
} else {
@ -632,7 +650,8 @@ public class IndexGeneratorJob implements Jobby
}
}
public static class IndexGeneratorStats {
public static class IndexGeneratorStats
{
private long invalidRowCount = 0;
public long getInvalidRowCount()