LUCENE-6530: Refactoring: Simplify ProcessBuilder code at various places

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1684040 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uwe Schindler 2015-06-07 15:49:10 +00:00
parent 4e783ac8c1
commit a8cf56b222
3 changed files with 19 additions and 98 deletions

View File

@ -22,11 +22,13 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.lang.ProcessBuilder.Redirect;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.nio.file.FileVisitResult; import java.nio.file.FileVisitResult;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor; import java.nio.file.SimpleFileVisitor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -96,11 +98,7 @@ public class TestIndexWriterOnJRECrash extends TestNRTThreads {
@SuppressForbidden(reason = "ProcessBuilder requires java.io.File for CWD") @SuppressForbidden(reason = "ProcessBuilder requires java.io.File for CWD")
public void forkTest() throws Exception { public void forkTest() throws Exception {
List<String> cmd = new ArrayList<>(); List<String> cmd = new ArrayList<>();
cmd.add(System.getProperty("java.home") cmd.add(Paths.get(System.getProperty("java.home"), "bin", "java").toString());
+ System.getProperty("file.separator")
+ "bin"
+ System.getProperty("file.separator")
+ "java");
cmd.add("-Xmx512m"); cmd.add("-Xmx512m");
cmd.add("-Dtests.crashmode=true"); cmd.add("-Dtests.crashmode=true");
// passing NIGHTLY to this test makes it run for much longer, easier to catch it in the act... // passing NIGHTLY to this test makes it run for much longer, easier to catch it in the act...
@ -112,19 +110,18 @@ public class TestIndexWriterOnJRECrash extends TestNRTThreads {
cmd.add(System.getProperty("java.class.path")); cmd.add(System.getProperty("java.class.path"));
cmd.add("org.junit.runner.JUnitCore"); cmd.add("org.junit.runner.JUnitCore");
cmd.add(getClass().getName()); cmd.add(getClass().getName());
ProcessBuilder pb = new ProcessBuilder(cmd); ProcessBuilder pb = new ProcessBuilder(cmd)
pb.directory(tempDir.toFile()); .directory(tempDir.toFile())
pb.redirectErrorStream(true); .redirectInput(Redirect.INHERIT)
.redirectErrorStream(true);
Process p = pb.start(); Process p = pb.start();
// We pump everything to stderr. // We pump everything to stderr.
PrintStream childOut = System.err; PrintStream childOut = System.err;
Thread stdoutPumper = ThreadPumper.start(p.getInputStream(), childOut); Thread stdoutPumper = ThreadPumper.start(p.getInputStream(), childOut);
Thread stderrPumper = ThreadPumper.start(p.getErrorStream(), childOut);
if (VERBOSE) childOut.println(">>> Begin subprocess output"); if (VERBOSE) childOut.println(">>> Begin subprocess output");
p.waitFor(); p.waitFor();
stdoutPumper.join(); stdoutPumper.join();
stderrPumper.join();
if (VERBOSE) childOut.println("<<< End subprocess output"); if (VERBOSE) childOut.println("<<< End subprocess output");
} }
@ -135,7 +132,7 @@ public class TestIndexWriterOnJRECrash extends TestNRTThreads {
@Override @Override
public void run() { public void run() {
try { try {
byte [] buffer = new byte [1024]; byte[] buffer = new byte [1024];
int len; int len;
while ((len = from.read(buffer)) != -1) { while ((len = from.read(buffer)) != -1) {
if (VERBOSE) { if (VERBOSE) {

View File

@ -36,7 +36,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -104,7 +103,7 @@ class SolrRecordWriter<K, V> extends RecordWriter<K, V> {
public SolrRecordWriter(TaskAttemptContext context, Path outputShardDir, int batchSize) { public SolrRecordWriter(TaskAttemptContext context, Path outputShardDir, int batchSize) {
this.batchSize = batchSize; this.batchSize = batchSize;
this.batch = new ArrayList(batchSize); this.batch = new ArrayList<>(batchSize);
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
// setLogLevel("org.apache.solr.core", "WARN"); // setLogLevel("org.apache.solr.core", "WARN");
@ -134,9 +133,6 @@ class SolrRecordWriter<K, V> extends RecordWriter<K, V> {
public static EmbeddedSolrServer createEmbeddedSolrServer(Path solrHomeDir, FileSystem fs, Path outputShardDir) public static EmbeddedSolrServer createEmbeddedSolrServer(Path solrHomeDir, FileSystem fs, Path outputShardDir)
throws IOException { throws IOException {
if (solrHomeDir == null) {
throw new IOException("Unable to find solr home setting");
}
LOG.info("Creating embedded Solr server with solrHomeDir: " + solrHomeDir + ", fs: " + fs + ", outputShardDir: " + outputShardDir); LOG.info("Creating embedded Solr server with solrHomeDir: " + solrHomeDir + ", fs: " + fs + ", outputShardDir: " + outputShardDir);
Path solrDataDir = new Path(outputShardDir, "data"); Path solrDataDir = new Path(outputShardDir, "data");
@ -186,7 +182,7 @@ class SolrRecordWriter<K, V> extends RecordWriter<K, V> {
} }
} }
public static void incrementCounter(TaskID taskId, Enum counterName, long incr) { public static void incrementCounter(TaskID taskId, Enum<?> counterName, long incr) {
Reducer<?,?,?,?>.Context context = contextMap.get(taskId); Reducer<?,?,?,?>.Context context = contextMap.get(taskId);
if (context != null) { if (context != null) {
context.getCounter(counterName).increment(incr); context.getCounter(counterName).increment(incr);
@ -199,53 +195,19 @@ class SolrRecordWriter<K, V> extends RecordWriter<K, V> {
} }
public static Path findSolrConfig(Configuration conf) throws IOException { public static Path findSolrConfig(Configuration conf) throws IOException {
Path solrHome = null;
// FIXME when mrunit supports the new cache apis // FIXME when mrunit supports the new cache apis
//URI[] localArchives = context.getCacheArchives(); //URI[] localArchives = context.getCacheArchives();
Path[] localArchives = DistributedCache.getLocalCacheArchives(conf); Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
if (localArchives.length == 0) { for (Path unpackedDir : localArchives) {
if (unpackedDir.getName().equals(SolrOutputFormat.getZipName(conf))) {
LOG.info("Using this unpacked directory as solr home: {}", unpackedDir);
return unpackedDir;
}
}
throw new IOException(String.format(Locale.ENGLISH, throw new IOException(String.format(Locale.ENGLISH,
"No local cache archives, where is %s:%s", SolrOutputFormat "No local cache archives, where is %s:%s", SolrOutputFormat
.getSetupOk(), SolrOutputFormat.getZipName(conf))); .getSetupOk(), SolrOutputFormat.getZipName(conf)));
} }
for (Path unpackedDir : localArchives) {
// Only logged if debugging
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(Locale.ENGLISH, "Examining unpack directory %s for %s",
unpackedDir, SolrOutputFormat.getZipName(conf)));
ProcessBuilder lsCmd = new ProcessBuilder(new String[] { "/bin/ls",
"-lR", unpackedDir.toString() });
lsCmd.redirectErrorStream();
Process ls = lsCmd.start();
byte[] buf = new byte[16 * 1024];
InputStream all = ls.getInputStream();
try {
int count;
while ((count = all.read(buf)) >= 0) {
System.err.write(buf, 0, count);
}
} catch (IOException ignore) {
} finally {
all.close();
}
String exitValue;
try {
exitValue = String.valueOf(ls.waitFor());
} catch (InterruptedException e) {
exitValue = "interrupted";
}
System.err.format(Locale.ENGLISH, "Exit value of 'ls -lR' is %s%n", exitValue);
}
if (unpackedDir.getName().equals(SolrOutputFormat.getZipName(conf))) {
LOG.info("Using this unpacked directory as solr home: {}", unpackedDir);
solrHome = unpackedDir;
break;
}
}
return solrHome;
}
/** /**
* Write a record. This method accumulates records in to a batch, and when * Write a record. This method accumulates records in to a batch, and when

View File

@ -19,9 +19,6 @@ package org.apache.solr.cloud;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -37,31 +34,6 @@ public class IpTables {
.getLogger(IpTables.class); .getLogger(IpTables.class);
private static boolean ENABLED = Boolean.getBoolean("solr.tests.use.iptables"); private static boolean ENABLED = Boolean.getBoolean("solr.tests.use.iptables");
static class ThreadPumper {
public ThreadPumper() {}
public static Thread start(final InputStream from, final OutputStream to, final boolean verbose) {
Thread t = new Thread() {
@Override
public void run() {
try {
byte [] buffer = new byte [1024];
int len;
while ((len = from.read(buffer)) != -1) {
if (verbose) {
to.write(buffer, 0, len);
}
}
} catch (IOException e) {
System.err.println("Couldn't pipe from the forked process: " + e.toString());
}
}
};
t.start();
return t;
}
}
private static Set<Integer> BLOCK_PORTS = Collections.synchronizedSet(new HashSet<Integer>()); private static Set<Integer> BLOCK_PORTS = Collections.synchronizedSet(new HashSet<Integer>());
@ -97,20 +69,10 @@ public class IpTables {
} }
} }
private static void runCmd(String[] cmd) throws IOException, InterruptedException { private static void runCmd(String... cmd) throws IOException, InterruptedException {
ProcessBuilder pb = new ProcessBuilder(cmd); final int exitCode = new ProcessBuilder(cmd).inheritIO().start().waitFor();
if (exitCode != 0) {
pb.redirectErrorStream(true); throw new IOException("iptables process did not exit successfully, exit code was: " + exitCode);
Process p = pb.start(); }
// We pump everything to stderr.
PrintStream childOut = System.err;
Thread stdoutPumper = ThreadPumper.start(p.getInputStream(), childOut, true);
Thread stderrPumper = ThreadPumper.start(p.getErrorStream(), childOut, true);
if (true) childOut.println(">>> Begin subprocess output");
p.waitFor();
stdoutPumper.join();
stderrPumper.join();
if (true) childOut.println("<<< End subprocess output");
} }
} }