handle supervisor spec metadata failures (#3456)

close kafka consumer in case supervisor start fails
This commit is contained in:
Parag Jain 2016-10-04 12:15:28 -05:00 committed by Slim
parent 43cdc675c7
commit e419407eba
2 changed files with 34 additions and 21 deletions

View File

@ -310,28 +310,30 @@ public class KafkaSupervisor implements Supervisor
}
}
);
firstRunTime = DateTime.now().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);
started = true;
log.info(
"Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
dataSource,
ioConfig.getStartDelay(),
spec.toString()
);
}
catch (Exception e) {
if (consumer != null) {
consumer.close();
}
log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
.emit();
throw Throwables.propagate(e);
}
firstRunTime = DateTime.now().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);
started = true;
log.info(
"Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
dataSource,
ioConfig.getStartDelay(),
spec.toString()
);
}
}

View File

@ -21,6 +21,7 @@ package io.druid.indexing.overlord.supervisor;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart;
@ -155,7 +156,7 @@ public class SupervisorManager
/**
* Stops a supervisor with a given id and then removes it from the list.
* <p>
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting and stopping supervisors.
*
@ -179,7 +180,7 @@ public class SupervisorManager
/**
* Creates a supervisor from the provided spec and starts it if there is not already a supervisor with that id.
* <p>
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting and stopping supervisors.
*
@ -192,13 +193,23 @@ public class SupervisorManager
return false;
}
Supervisor supervisor = spec.createSupervisor();
supervisor.start(); // try starting the supervisor first so we don't persist a bad spec
if (persistSpec) {
metadataSupervisorManager.insert(id, spec);
}
Supervisor supervisor = null;
try {
supervisor = spec.createSupervisor();
supervisor.start();
}
catch (Exception e) {
// Supervisor creation or start failed write tombstone only when trying to start a new supervisor
if (persistSpec) {
metadataSupervisorManager.insert(id, new NoopSupervisorSpec());
}
Throwables.propagate(e);
}
supervisors.put(id, Pair.of(supervisor, spec));
return true;
}