SOLR-5824: Merge up Solr MapReduce contrib code to latest external changes. Includes a few minor bug fixes.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1579318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-03-19 17:52:10 +00:00
parent 7cee4be199
commit 84e2979d81
4 changed files with 35 additions and 9 deletions

View File

@ -283,6 +283,10 @@ Bug Fixes
* SOLR-5782: The full MapReduceIndexer help text does not display when using
--help. (Mark Miller, Wolfgang Hoschek)
* SOLR-5824: Merge up Solr MapReduce contrib code to latest external changes.
Includes a few minor bug fixes.
(Mark Miller)
================== 4.7.0 ==================
Versions of Major Components

View File

@ -322,11 +322,12 @@ public class MapReduceIndexerTool extends Configured implements Tool {
Argument reducersArg = parser.addArgument("--reducers")
.metavar("INTEGER")
.type(Integer.class)
.choices(new RangeArgumentChoice(-1, Integer.MAX_VALUE)) // TODO: also support X% syntax where X is an integer
.choices(new RangeArgumentChoice(-2, Integer.MAX_VALUE)) // TODO: also support X% syntax where X is an integer
.setDefault(-1)
.help("Tuning knob that indicates the number of reducers to index into. " +
"0 is reserved for a mapper-only feature that may ship in a future release. " +
"-1 indicates use all reduce slots available on the cluster. " +
"0 indicates use one reducer per output shard, which disables the mtree merge MR algorithm. " +
"-2 indicates use one reducer per output shard, which disables the mtree merge MR algorithm. " +
"The mtree merge MR algorithm improves scalability by spreading load " +
"(in particular CPU load) among a number of parallel reducers that can be much larger than the number " +
"of solr shards expected by the user. It can be seen as an extension of concurrent lucene merges " +
@ -511,6 +512,9 @@ public class MapReduceIndexerTool extends Configured implements Tool {
opts.collection = ns.getString(collectionArg.getDest());
try {
if (opts.reducers == 0) {
throw new ArgumentParserException("--reducers must not be zero", parser);
}
verifyGoLiveArgs(opts, parser);
} catch (ArgumentParserException e) {
parser.handleError(e);
@ -606,8 +610,7 @@ public class MapReduceIndexerTool extends Configured implements Tool {
/** API for Java clients; visible for testing; may become a public API eventually */
int run(Options options) throws Exception {
if ("local".equals(getConf().get("mapred.job.tracker"))) {
if (getConf().getBoolean("isMR1", false) && "local".equals(getConf().get("mapred.job.tracker"))) {
throw new IllegalStateException(
"Running with LocalJobRunner (i.e. all of Hadoop inside a single JVM) is not supported " +
"because LocalJobRunner does not (yet) implement the Hadoop Distributed Cache feature, " +
@ -884,11 +887,14 @@ public class MapReduceIndexerTool extends Configured implements Tool {
//reducers = job.getCluster().getClusterStatus().getReduceSlotCapacity(); // Yarn only
LOG.info("Cluster reports {} reduce slots", reducers);
if (options.reducers == 0) {
if (options.reducers == -2) {
reducers = options.shards;
} else if (options.reducers == -1) {
reducers = Math.min(reducers, realMappers); // no need to use many reducers when using few mappers
} else {
if (options.reducers == 0) {
throw new IllegalStateException("Illegal zero reducers");
}
reducers = options.reducers;
}
reducers = Math.max(reducers, options.shards);
@ -925,8 +931,8 @@ public class MapReduceIndexerTool extends Configured implements Tool {
if (inputFileFs.exists(inputFile)) {
PathFilter pathFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.getName().startsWith("."); // ignore "hidden" files and dirs
public boolean accept(Path path) { // ignore "hidden" files and dirs
return !(path.getName().startsWith(".") || path.getName().startsWith("_"));
}
};
numFiles += addInputFilesRecursively(inputFile, writer, inputFileFs, pathFilter);
@ -1084,7 +1090,7 @@ public class MapReduceIndexerTool extends Configured implements Tool {
* like this:
*
* ... caused by compilation failed: mfm:///MyJavaClass1.java:2: package
* com.cloudera.cdk.morphline.api does not exist
* org.kitesdk.morphline.api does not exist
*/
LOG.trace("dryRun: java.class.path: {}", System.getProperty("java.class.path"));
String fullClassPath = "";

View File

@ -157,7 +157,7 @@ class SolrRecordWriter<K, V> extends RecordWriter<K, V> {
System.setProperty("solr.lock.type", "hdfs");
System.setProperty("solr.hdfs.nrtcachingdirectory", "false");
System.setProperty("solr.hdfs.blockcache.enabled", "false");
System.setProperty("solr.autoCommit.maxTime", "-1");
System.setProperty("solr.autoCommit.maxTime", "600000");
System.setProperty("solr.autoSoftCommit.maxTime", "-1");
CoreContainer container = new CoreContainer(loader);

View File

@ -194,7 +194,23 @@ final class ZooKeeperInspector {
dir = confDir.getParentFile();
}
FileUtils.writeStringToFile(new File(dir, "solr.xml"), "<solr><cores><core name=\"collection1\" instanceDir=\".\" /></cores></solr>", "UTF-8");
verifyConfigDir(confDir);
return dir;
}
private void verifyConfigDir(File confDir) throws IOException {
File solrConfigFile = new File(confDir, "solrconfig.xml");
if (!solrConfigFile.exists()) {
throw new IOException("Detected invalid Solr config dir in ZooKeeper - Reason: File not found: "
+ solrConfigFile.getName());
}
if (!solrConfigFile.isFile()) {
throw new IOException("Detected invalid Solr config dir in ZooKeeper - Reason: Not a file: "
+ solrConfigFile.getName());
}
if (!solrConfigFile.canRead()) {
throw new IOException("Insufficient permissions to read file: " + solrConfigFile);
}
}
}