Make optional Peon "stdin" check (#4760)

This commit is contained in:
Egor Riashin 2017-09-12 00:37:01 +03:00 committed by Roman Leventov
parent 23c0357816
commit 6f3e52b3db
2 changed files with 37 additions and 20 deletions

View File

@ -139,29 +139,31 @@ public class ExecutorLifecycle
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
// Spawn monitor thread to keep a watch on parent's stdin if (taskExecutorConfig.isParentStreamDefined()) {
// If stdin reaches eof, the parent is gone, and we should shut down // Spawn monitor thread to keep a watch on parent's stdin
parentMonitorExec.submit( // If stdin reaches eof, the parent is gone, and we should shut down
new Runnable() parentMonitorExec.submit(
{ new Runnable()
@Override
public void run()
{ {
try { @Override
while (parentStream.read() != -1) { public void run()
// Toss the byte {
try {
while (parentStream.read() != -1) {
// Toss the byte
}
}
catch (Exception e) {
log.error(e, "Failed to read from stdin");
} }
}
catch (Exception e) {
log.error(e, "Failed to read from stdin");
}
// Kind of gross, but best way to kill the JVM as far as I know // Kind of gross, but best way to kill the JVM as far as I know
log.info("Triggering JVM shutdown."); log.info("Triggering JVM shutdown.");
System.exit(2); System.exit(2);
}
} }
} );
); }
// Won't hurt in remote mode, and is required for setting up locks in local mode: // Won't hurt in remote mode, and is required for setting up locks in local mode:
try { try {

View File

@ -20,7 +20,6 @@
package io.druid.indexing.worker.executor; package io.druid.indexing.worker.executor;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
@ -43,6 +42,22 @@ public class ExecutorLifecycleConfig
@JsonProperty @JsonProperty
@Pattern(regexp = "\\{stdin\\}") @Pattern(regexp = "\\{stdin\\}")
private String parentStreamName = "stdin"; private String parentStreamName = "stdin";
@JsonProperty
private boolean parentStreamDefined = true;
/**
* Should parent stream be monitored.
*/
public boolean isParentStreamDefined()
{
return parentStreamDefined;
}
public ExecutorLifecycleConfig setParentStreamDefined(boolean parentStreamDefined)
{
this.parentStreamDefined = parentStreamDefined;
return this;
}
public File getTaskFile() public File getTaskFile()
{ {