SOLR-1301: Merge in latest solr-map-reduce updates.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1547871 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-12-04 18:28:45 +00:00
parent 1ae2e523df
commit b2a253a930
12 changed files with 481 additions and 88 deletions

View File

@ -0,0 +1,76 @@
//The MIT License
//
// Copyright (c) 2003 Ron Alford, Mike Grove, Bijan Parsia, Evren Sirin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
package org.apache.solr.hadoop;
import java.util.Comparator;
/**
* This is a comparator to perform a mix of alphabetical+numeric comparison. For
* example, if there is a list {"test10", "test2", "test150", "test25", "test1"}
* then what we generally expect from the ordering is the result {"test1",
* "test2", "test10", "test25", "test150"}. However, standard lexigraphic
* ordering does not do that and "test10" comes before "test2". This class is
* provided to overcome that problem. This functionality is useful to sort the
* benchmark files (like the ones in in DL-benchmark-suite) from smallest to the
* largest. Comparisons are done on the String values retuned by toString() so
* care should be taken when this comparator is used to sort arbitrary Java
* objects.
*
*/
final class AlphaNumericComparator implements Comparator {
public AlphaNumericComparator() {
}
public int compare(Object o1, Object o2) {
String s1 = o1.toString();
String s2 = o2.toString();
int n1 = s1.length(), n2 = s2.length();
int i1 = 0, i2 = 0;
while (i1 < n1 && i2 < n2) {
int p1 = i1;
int p2 = i2;
char c1 = s1.charAt(i1++);
char c2 = s2.charAt(i2++);
if(c1 != c2) {
if (Character.isDigit(c1) && Character.isDigit(c2)) {
int value1 = 0, value2 = 0;
while (i1 < n1 && Character.isDigit(c1 = s1.charAt(i1))) {
i1++;
}
value1 = Integer.parseInt(s1.substring(p1, i1));
while (i2 < n2 && Character.isDigit(c2 = s2.charAt(i2))) {
i2++;
}
value2 = Integer.parseInt(s2.substring(p2, i2));
if (value1 != value2) {
return value1 - value2;
}
}
return c1 - c2;
}
}
return n1 - n2;
}
}

View File

@ -196,8 +196,8 @@ class BatchWriter {
batchPool.awaitTermination(5, TimeUnit.SECONDS);
}
}
//reporter.setStatus("Committing Solr");
//solr.commit(true, false);
context.setStatus("Committing Solr Phase 1");
solr.commit(true, false);
context.setStatus("Optimizing Solr");
int maxSegments = context.getConfiguration().getInt(SolrOutputFormat.SOLR_RECORD_WRITER_MAX_SEGMENTS, 1);
LOG.info("Optimizing Solr: forcing merge down to {} segments", maxSegments);
@ -206,9 +206,9 @@ class BatchWriter {
context.getCounter(SolrCounters.class.getName(), SolrCounters.PHYSICAL_REDUCER_MERGE_TIME.toString()).increment(System.currentTimeMillis() - start);
float secs = (System.currentTimeMillis() - start) / 1000.0f;
LOG.info("Optimizing Solr: done forcing merge down to {} segments in {} secs", maxSegments, secs);
context.setStatus("Committing Solr Phase 2");
solr.commit(true, false);
context.setStatus("Shutting down Solr");
// TODO is core close needed? - according to TestEmbeddedSolrServer it's not...
//core.close();
solr.shutdown();
}

View File

@ -33,10 +33,13 @@ import java.io.Writer;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
@ -80,6 +83,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.cdk.morphline.base.Fields;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
/**
@ -107,6 +113,10 @@ public class MapReduceIndexerTool extends Configured implements Tool {
*/
static final class MyArgumentParser {
private static final String SHOW_NON_SOLR_CLOUD = "--show-non-solr-cloud";
private boolean showNonSolrCloud = false;
/**
* Parses the given command line arguments.
*
@ -123,6 +133,8 @@ public class MapReduceIndexerTool extends Configured implements Tool {
args = new String[] { "--help" };
}
showNonSolrCloud = Arrays.asList(args).contains(SHOW_NON_SOLR_CLOUD); // intercept it first
ArgumentParser parser = ArgumentParsers
.newArgumentParser("hadoop [GenericOptions]... jar search-mr-*-job.jar " + MapReduceIndexerTool.class.getName(), false)
.defaultHelp(true)
@ -297,7 +309,7 @@ public class MapReduceIndexerTool extends Configured implements Tool {
"specified by --morphline-file. If the --morphline-id option is ommitted the first (i.e. " +
"top-most) morphline within the config file is used. Example: morphline1");
Argument solrHomeDirArg = parser.addArgument("--solr-home-dir")
Argument solrHomeDirArg = nonSolrCloud(parser.addArgument("--solr-home-dir")
.metavar("DIR")
.type(new FileArgumentType() {
@Override
@ -312,7 +324,7 @@ public class MapReduceIndexerTool extends Configured implements Tool {
.required(false)
.help("Relative or absolute path to a local dir containing Solr conf/ dir and in particular " +
"conf/solrconfig.xml and optionally also lib/ dir. This directory will be uploaded to each MR task. " +
"Example: src/test/resources/solr/minimr");
"Example: src/test/resources/solr/minimr"));
Argument updateConflictResolverArg = parser.addArgument("--update-conflict-resolver")
.metavar("FQCN")
@ -404,25 +416,20 @@ public class MapReduceIndexerTool extends Configured implements Tool {
.action(Arguments.storeTrue())
.help("Turn on verbose output.");
parser.addArgument(SHOW_NON_SOLR_CLOUD)
.action(Arguments.storeTrue())
.help("Also show options for Non-SolrCloud mode as part of --help.");
ArgumentGroup clusterInfoGroup = parser
.addArgumentGroup("Cluster arguments")
.description(
"Arguments that provide information about your Solr cluster. "
+ "If you are not using --go-live, pass the --shards argument. If you are building shards for "
+ "a Non-SolrCloud cluster, pass the --shard-url argument one or more times. To build indexes for"
+ " a replicated cluster with --shard-url, pass replica urls consecutively and also pass --shards. "
+ "If you are building shards for a SolrCloud cluster, pass the --zk-host argument. "
+ "Using --go-live requires either --shard-url or --zk-host.");
+ nonSolrCloud("If you are building shards for a SolrCloud cluster, pass the --zk-host argument. "
+ "If you are building shards for "
+ "a Non-SolrCloud cluster, pass the --shard-url argument one or more times. To build indexes for "
+ "a replicated Non-SolrCloud cluster with --shard-url, pass replica urls consecutively and also pass --shards. "
+ "Using --go-live requires either --zk-host or --shard-url."));
Argument shardUrlsArg = clusterInfoGroup.addArgument("--shard-url")
.metavar("URL")
.type(String.class)
.action(Arguments.append())
.help("Solr URL to merge resulting shard into if using --go-live. " +
"Example: http://solr001.mycompany.com:8983/solr/collection1. " +
"Multiple --shard-url arguments can be specified, one for each desired shard. " +
"If you are merging shards into a SolrCloud cluster, use --zk-host instead.");
Argument zkHostArg = clusterInfoGroup.addArgument("--zk-host")
.metavar("STRING")
.type(String.class)
@ -444,15 +451,24 @@ public class MapReduceIndexerTool extends Configured implements Tool {
+ "would be relative to this root - i.e. getting/setting/etc... "
+ "'/foo/bar' would result in operations being run on "
+ "'/solr/foo/bar' (from the server perspective).\n"
+ "\n"
+ nonSolrCloud("\n"
+ "If --solr-home-dir is not specified, the Solr home directory for the collection "
+ "will be downloaded from this ZooKeeper ensemble.");
+ "will be downloaded from this ZooKeeper ensemble."));
Argument shardsArg = clusterInfoGroup.addArgument("--shards")
Argument shardUrlsArg = nonSolrCloud(clusterInfoGroup.addArgument("--shard-url")
.metavar("URL")
.type(String.class)
.action(Arguments.append())
.help("Solr URL to merge resulting shard into if using --go-live. " +
"Example: http://solr001.mycompany.com:8983/solr/collection1. " +
"Multiple --shard-url arguments can be specified, one for each desired shard. " +
"If you are merging shards into a SolrCloud cluster, use --zk-host instead."));
Argument shardsArg = nonSolrCloud(clusterInfoGroup.addArgument("--shards")
.metavar("INTEGER")
.type(Integer.class)
.choices(new RangeArgumentChoice(1, Integer.MAX_VALUE))
.help("Number of output shards to generate.");
.help("Number of output shards to generate."));
ArgumentGroup goLiveGroup = parser.addArgumentGroup("Go live arguments")
.description("Arguments for merging the shards that are built into a live Solr cluster. " +
@ -462,8 +478,8 @@ public class MapReduceIndexerTool extends Configured implements Tool {
.action(Arguments.storeTrue())
.help("Allows you to optionally merge the final index shards into a live Solr cluster after they are built. " +
"You can pass the ZooKeeper address with --zk-host and the relevant cluster information will be auto detected. " +
"If you are not using a SolrCloud cluster, --shard-url arguments can be used to specify each SolrCore to merge " +
"each shard into.");
nonSolrCloud("If you are not using a SolrCloud cluster, --shard-url arguments can be used to specify each SolrCore to merge " +
"each shard into."));
Argument collectionArg = goLiveGroup.addArgument("--collection")
.metavar("STRING")
@ -538,6 +554,15 @@ public class MapReduceIndexerTool extends Configured implements Tool {
return null;
}
// make it a "hidden" option, i.e. the option is functional and enabled but not shown in --help output
private Argument nonSolrCloud(Argument arg) {
return showNonSolrCloud ? arg : arg.help(FeatureControl.SUPPRESS);
}
private String nonSolrCloud(String msg) {
return showNonSolrCloud ? msg : "";
}
/** Marker trick to prevent processing of any remaining arguments once --help option has been parsed */
private static final class FoundHelpArgument extends RuntimeException {
}
@ -785,7 +810,7 @@ public class MapReduceIndexerTool extends Configured implements Tool {
job.setOutputValueClass(SolrInputDocumentWritable.class);
LOG.info("Indexing {} files using {} real mappers into {} reducers", new Object[] {numFiles, realMappers, reducers});
startTime = System.currentTimeMillis();
if (!waitForCompletion(job, true)) {
if (!waitForCompletion(job, options.isVerbose)) {
return -1; // job failed
}
@ -826,6 +851,9 @@ public class MapReduceIndexerTool extends Configured implements Tool {
if (!waitForCompletion(job, options.isVerbose)) {
return -1; // job failed
}
if (!renameTreeMergeShardDirs(outputTreeMergeStep, job, fs)) {
return -1;
}
secs = (System.currentTimeMillis() - startTime) / 1000.0f;
LOG.info("MTree merge iteration {}/{}: Done. Merging {} shards into {} shards using fanout {} took {} secs",
new Object[] {mtreeMergeIteration, mtreeMergeIterations, reducers, (reducers / options.fanout), options.fanout, secs});
@ -1182,10 +1210,102 @@ public class MapReduceIndexerTool extends Configured implements Tool {
throw new IllegalStateException("Not a directory: " + dir.getPath());
}
}
Arrays.sort(dirs); // FIXME: handle more than 99999 shards (need numeric sort rather than lexicographical sort)
// use alphanumeric sort (rather than lexicographical sort) to properly handle more than 99999 shards
Arrays.sort(dirs, new Comparator<FileStatus>() {
@Override
public int compare(FileStatus f1, FileStatus f2) {
return new AlphaNumericComparator().compare(f1.getPath().getName(), f2.getPath().getName());
}
});
return dirs;
}
/*
* You can run MapReduceIndexerTool in Solrcloud mode, and once the MR job completes, you can use
* the standard solrj Solrcloud API to send doc updates and deletes to SolrCloud, and those updates
* and deletes will go to the right Solr shards, and it will work just fine.
*
* The MapReduce framework doesn't guarantee that input split N goes to the map task with the
* taskId = N. The job tracker and Yarn schedule and assign tasks, considering data locality
* aspects, but without regard of the input split# withing the overall list of input splits. In
* other words, split# != taskId can be true.
*
* To deal with this issue, our mapper tasks write a little auxiliary metadata file (per task)
* that tells the job driver which taskId processed which split#. Once the mapper-only job is
* completed, the job driver renames the output dirs such that the dir name contains the true solr
* shard id, based on these auxiliary files.
*
* This way each doc gets assigned to the right Solr shard even with #reducers > #solrshards
*
* Example for a merge with two shards:
*
* part-m-00000 and part-m-00001 goes to outputShardNum = 0 and will end up in merged part-m-00000
* part-m-00002 and part-m-00003 goes to outputShardNum = 1 and will end up in merged part-m-00001
* part-m-00004 and part-m-00005 goes to outputShardNum = 2 and will end up in merged part-m-00002
* ... and so on
*
* Also see run() method above where it uses NLineInputFormat.setNumLinesPerSplit(job,
* options.fanout)
*
* Also see TreeMergeOutputFormat.TreeMergeRecordWriter.writeShardNumberFile()
*/
private boolean renameTreeMergeShardDirs(Path outputTreeMergeStep, Job job, FileSystem fs) throws IOException {
final String dirPrefix = SolrOutputFormat.getOutputName(job);
FileStatus[] dirs = fs.listStatus(outputTreeMergeStep, new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith(dirPrefix);
}
});
for (FileStatus dir : dirs) {
if (!dir.isDirectory()) {
throw new IllegalStateException("Not a directory: " + dir.getPath());
}
}
// Example: rename part-m-00004 to _part-m-00004
for (FileStatus dir : dirs) {
Path path = dir.getPath();
Path renamedPath = new Path(path.getParent(), "_" + path.getName());
if (!rename(path, renamedPath, fs)) {
return false;
}
}
// Example: rename _part-m-00004 to part-m-00002
for (FileStatus dir : dirs) {
Path path = dir.getPath();
Path renamedPath = new Path(path.getParent(), "_" + path.getName());
// read auxiliary metadata file (per task) that tells which taskId
// processed which split# aka solrShard
Path solrShardNumberFile = new Path(renamedPath, TreeMergeMapper.SOLR_SHARD_NUMBER);
InputStream in = fs.open(solrShardNumberFile);
byte[] bytes = ByteStreams.toByteArray(in);
in.close();
Preconditions.checkArgument(bytes.length > 0);
int solrShard = Integer.parseInt(new String(bytes, Charsets.UTF_8));
if (!delete(solrShardNumberFile, false, fs)) {
return false;
}
// same as FileOutputFormat.NUMBER_FORMAT
NumberFormat numberFormat = NumberFormat.getInstance(Locale.ENGLISH);
numberFormat.setMinimumIntegerDigits(5);
numberFormat.setGroupingUsed(false);
Path finalPath = new Path(renamedPath.getParent(), dirPrefix + "-m-" + numberFormat.format(solrShard));
LOG.info("MTree merge renaming solr shard: " + solrShard + " from dir: " + dir.getPath() + " to dir: " + finalPath);
if (!rename(renamedPath, finalPath, fs)) {
return false;
}
}
return true;
}
private static void verifyGoLiveArgs(Options opts, ArgumentParser parser) throws ArgumentParserException {
if (opts.zkHost == null && opts.solrHomeDir == null) {
throw new ArgumentParserException("At least one of --zk-host or --solr-home-dir is required", parser);

View File

@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import com.cloudera.cdk.morphline.api.ExceptionHandler;
import com.cloudera.cdk.morphline.base.FaultTolerance;
import com.google.common.base.Preconditions;
/**
* This class loads the mapper's SolrInputDocuments into one EmbeddedSolrServer
@ -54,6 +55,7 @@ public class SolrReducer extends Reducer<Text, SolrInputDocumentWritable, Text,
@Override
protected void setup(Context context) throws IOException, InterruptedException {
verifyPartitionAssignment(context);
SolrRecordWriter.addReducerContext(context);
Class<? extends UpdateConflictResolver> resolverClass = context.getConfiguration().getClass(
UPDATE_CONFLICT_RESOLVER, RetainMostRecentUpdateConflictResolver.class, UpdateConflictResolver.class);
@ -107,6 +109,24 @@ public class SolrReducer extends Reducer<Text, SolrInputDocumentWritable, Text,
super.cleanup(context);
}
/*
* Verify that if a mappers's partitioner sends an item to partition X it implies that said item
* is sent to the reducer with taskID == X. This invariant is currently required for Solr
* documents to end up in the right Solr shard.
*/
private void verifyPartitionAssignment(Context context) {
if ("true".equals(System.getProperty("verifyPartitionAssignment", "true"))) {
String partitionStr = context.getConfiguration().get("mapred.task.partition");
if (partitionStr == null) {
partitionStr = context.getConfiguration().get("mapreduce.task.partition");
}
int partition = Integer.parseInt(partitionStr);
int taskId = context.getTaskAttemptID().getTaskID().getId();
Preconditions.checkArgument(partition == taskId,
"mapred.task.partition: " + partition + " not equal to reducer taskId: " + taskId);
}
}
///////////////////////////////////////////////////////////////////////////////
// Nested classes:

View File

@ -34,6 +34,8 @@ public class TreeMergeMapper extends Mapper<LongWritable, Text, Text, NullWritab
public static final String MAX_SEGMENTS_ON_TREE_MERGE = "maxSegmentsOnTreeMerge";
public static final String SOLR_SHARD_NUMBER = "_solrShardNumber";
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
LOGGER.trace("map key: {}, value: {}", key, value);

View File

@ -17,6 +17,9 @@
package org.apache.solr.hadoop;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
@ -39,6 +42,9 @@ import org.apache.solr.store.hdfs.HdfsDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
/**
* See {@link IndexMergeTool}.
*/
@ -84,13 +90,15 @@ public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable>
@Override
public void close(TaskAttemptContext context) throws IOException {
LOG.debug("Merging into dstDir: " + workDir + ", srcDirs: {}", shards);
LOG.debug("Task " + context.getTaskAttemptID() + " merging into dstDir: " + workDir + ", srcDirs: " + shards);
writeShardNumberFile(context);
heartBeater.needHeartBeat();
try {
Directory mergedIndex = new HdfsDirectory(workDir, context.getConfiguration());
// TODO: shouldn't we pull the Version from the solrconfig.xml?
IndexWriterConfig writerConfig = new IndexWriterConfig(Version.LUCENE_CURRENT, null)
.setOpenMode(OpenMode.CREATE)
.setOpenMode(OpenMode.CREATE).setUseCompoundFile(false)
//.setMergePolicy(mergePolicy) // TODO: grab tuned MergePolicy from solrconfig.xml?
//.setMergeScheduler(...) // TODO: grab tuned MergeScheduler from solrconfig.xml?
;
@ -162,6 +170,27 @@ public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable>
heartBeater.cancelHeartBeat();
heartBeater.close();
}
}
/*
* For background see MapReduceIndexerTool.renameTreeMergeShardDirs()
*
* Also see MapReduceIndexerTool.run() method where it uses
* NLineInputFormat.setNumLinesPerSplit(job, options.fanout)
*/
private void writeShardNumberFile(TaskAttemptContext context) throws IOException {
Preconditions.checkArgument(shards.size() > 0);
String shard = shards.get(0).getParent().getParent().getName(); // move up from "data/index"
String taskId = shard.substring("part-m-".length(), shard.length()); // e.g. part-m-00001
int taskNum = Integer.parseInt(taskId);
int outputShardNum = taskNum / shards.size();
LOG.debug("Merging into outputShardNum: " + outputShardNum + " from taskId: " + taskId);
Path shardNumberFile = new Path(workDir.getParent().getParent(), TreeMergeMapper.SOLR_SHARD_NUMBER);
OutputStream out = shardNumberFile.getFileSystem(context.getConfiguration()).create(shardNumberFile);
Writer writer = new OutputStreamWriter(out, Charsets.UTF_8);
writer.write(String.valueOf(outputShardNum));
writer.flush();
writer.close();
}
}
}

View File

@ -117,9 +117,11 @@ final class ZooKeeperInspector {
Collections.sort(sorted, new Comparator<Slice>() {
@Override
public int compare(Slice slice1, Slice slice2) {
return slice1.getName().compareTo(slice2.getName());
Comparator c = new AlphaNumericComparator();
return c.compare(slice1.getName(), slice2.getName());
}
});
LOG.trace("Sorted slices: {}", sorted);
return sorted;
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.hadoop;
import java.util.Comparator;
import org.junit.Assert;
import org.junit.Test;
public class AlphaNumericComparatorTest extends Assert {
@Test
public void testBasic() {
Comparator c = new AlphaNumericComparator();
assertTrue(c.compare("a", "b") < 0);
assertTrue(c.compare("shard1", "shard1") == 0);
//assertTrue(c.compare("shard01", "shard1") == 0);
assertTrue(c.compare("shard10", "shard10") == 0);
assertTrue(c.compare("shard1", "shard2") < 0);
assertTrue(c.compare("shard9", "shard10") < 0);
assertTrue(c.compare("shard09", "shard10") < 0);
assertTrue(c.compare("shard019", "shard10") > 0);
assertTrue(c.compare("shard10", "shard11") < 0);
assertTrue(c.compare("shard10z", "shard10z") == 0);
assertTrue(c.compare("shard10z", "shard11z") < 0);
assertTrue(c.compare("shard10a", "shard10z") < 0);
assertTrue(c.compare("shard10z", "shard10a") > 0);
assertTrue(c.compare("shard1z", "shard1z") == 0);
assertTrue(c.compare("shard2", "shard1") > 0);
}
}

View File

@ -54,6 +54,7 @@ public abstract class MRUnitBase extends SolrTestCaseJ4 {
setupMorphline(tempDir, "test-morphlines/solrCellDocumentTypes");
config.set(MorphlineMapRunner.MORPHLINE_FILE_PARAM, tempDir + "/test-morphlines/solrCellDocumentTypes.conf");
config.set(SolrOutputFormat.ZIP_NAME, solrHomeZip.getName());
}
public static void setupMorphline(String tempDir, String file) throws IOException {

View File

@ -43,6 +43,7 @@ import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.hadoop.hack.MiniMRCluster;
import org.apache.solr.handler.extraction.ExtractingParams;
import org.apache.solr.util.ExternalPaths;
import org.junit.After;
import org.junit.AfterClass;
@ -323,7 +324,7 @@ public class MorphlineBasicMiniMRTest extends SolrTestCaseJ4 {
jobConf.setMaxMapAttempts(1);
jobConf.setMaxReduceAttempts(1);
jobConf.setJar(SEARCH_ARCHIVES_JAR);
jobConf.setBoolean("ignoreTikaException", false);
jobConf.setBoolean(ExtractingParams.IGNORE_TIKA_EXCEPTION, false);
int shards = 2;
int maxReducers = Integer.MAX_VALUE;

View File

@ -25,9 +25,11 @@ import java.io.Writer;
import java.lang.reflect.Array;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
@ -36,16 +38,16 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrQuery.ORDER;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
@ -53,6 +55,9 @@ import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
@ -62,12 +67,12 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.hadoop.hack.MiniMRClientCluster;
import org.apache.solr.hadoop.hack.MiniMRClientClusterFactory;
import org.apache.solr.handler.extraction.ExtractingParams;
import org.apache.solr.util.ExternalPaths;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
@ -86,16 +91,16 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies.Conseque
@Slow
public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
private static final int RECORD_COUNT = 2104;
private static final String RESOURCES_DIR = ExternalPaths.SOURCE_HOME + "/contrib/map-reduce/src/test-files";
private static final String DOCUMENTS_DIR = RESOURCES_DIR + "/test-documents";
private static final File MINIMR_INSTANCE_DIR = new File(RESOURCES_DIR + "/solr/minimr");
private static final File MINIMR_CONF_DIR = new File(RESOURCES_DIR + "/solr/minimr");
private static final String SEARCH_ARCHIVES_JAR = JarFinder.getJar(MapReduceIndexerTool.class);
private static MiniDFSCluster dfsCluster = null;
private static MiniMRClientCluster mrCluster = null;
private static int numRuns = 0;
private static String tempDir;
private final String inputAvroFile1;
@ -115,17 +120,8 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
this.inputAvroFile3 = "sample-statuses-20120906-141433-medium.avro";
fixShardCount = true;
sliceCount = TEST_NIGHTLY ? 3 : 3;
shardCount = TEST_NIGHTLY ? 3 : 3;
}
private static boolean isYarn() {
try {
Job.class.getMethod("getCluster");
return true;
} catch (NoSuchMethodException e) {
return false;
}
sliceCount = TEST_NIGHTLY ? 7 : 3;
shardCount = TEST_NIGHTLY ? 7 : 3;
}
@BeforeClass
@ -371,7 +367,7 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
jobConf.setMaxMapAttempts(1);
jobConf.setMaxReduceAttempts(1);
jobConf.setJar(SEARCH_ARCHIVES_JAR);
jobConf.setBoolean("ignoreTikaException", false);
jobConf.setBoolean(ExtractingParams.IGNORE_TIKA_EXCEPTION, false);
MapReduceIndexerTool tool;
int res;
@ -384,7 +380,7 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
"--output-dir=" + outDir.toString(),
"--log4j=" + ExternalPaths.SOURCE_HOME + "/core/src/test-files/log4j.properties",
"--mappers=3",
++numRuns % 2 == 0 ? "--input-list=" + INPATH.toString() : dataDir.toString(),
random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(),
"--go-live-threads", Integer.toString(random().nextInt(15) + 1),
"--verbose",
"--go-live"
@ -396,9 +392,7 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
if (true) {
tool = new MapReduceIndexerTool();
res = ToolRunner.run(jobConf, tool, args);
assertEquals(0, res);
assertTrue(tool.job.isComplete());
assertTrue(tool.job.isSuccessful());
@ -418,7 +412,7 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
"--mappers=3",
"--verbose",
"--go-live",
++numRuns % 2 == 0 ? "--input-list=" + INPATH.toString() : dataDir.toString(),
random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(),
"--go-live-threads", Integer.toString(random().nextInt(15) + 1)
};
args = prependInitialArgs(args);
@ -449,14 +443,19 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
fs.delete(outDir, true);
fs.delete(dataDir, true);
INPATH = upAvroFile(fs, inDir, DATADIR, dataDir, inputAvroFile3);
cloudClient.deleteByQuery("*:*");
cloudClient.commit();
assertEquals(0, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());
args = new String[] {
"--output-dir=" + outDir.toString(),
"--mappers=3",
"--reducers=6",
"--reducers=12",
"--fanout=2",
"--verbose",
"--go-live",
++numRuns % 2 == 0 ? "--input-list=" + INPATH.toString() : dataDir.toString(),
random().nextBoolean() ? "--input-list=" + INPATH.toString() : dataDir.toString(),
"--zk-host", zkServer.getZkAddress(),
"--collection", collection
};
@ -469,15 +468,55 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
assertTrue(tool.job.isComplete());
assertTrue(tool.job.isSuccessful());
results = server.query(new SolrQuery("*:*"));
assertEquals(2126, results.getResults().getNumFound());
SolrDocumentList resultDocs = executeSolrQuery(cloudClient, "*:*");
assertEquals(RECORD_COUNT, resultDocs.getNumFound());
assertEquals(RECORD_COUNT, resultDocs.size());
// perform updates
for (int i = 0; i < RECORD_COUNT; i++) {
SolrDocument doc = resultDocs.get(i);
SolrInputDocument update = new SolrInputDocument();
for (Map.Entry<String, Object> entry : doc.entrySet()) {
update.setField(entry.getKey(), entry.getValue());
}
update.setField("user_screen_name", "Nadja" + i);
update.removeField("_version_");
cloudClient.add(update);
}
cloudClient.commit();
// verify updates
SolrDocumentList resultDocs2 = executeSolrQuery(cloudClient, "*:*");
assertEquals(RECORD_COUNT, resultDocs2.getNumFound());
assertEquals(RECORD_COUNT, resultDocs2.size());
for (int i = 0; i < RECORD_COUNT; i++) {
SolrDocument doc = resultDocs.get(i);
SolrDocument doc2 = resultDocs2.get(i);
assertEquals(doc.getFirstValue("id"), doc2.getFirstValue("id"));
assertEquals("Nadja" + i, doc2.getFirstValue("user_screen_name"));
assertEquals(doc.getFirstValue("text"), doc2.getFirstValue("text"));
// perform delete
cloudClient.deleteById((String)doc.getFirstValue("id"));
}
cloudClient.commit();
// verify deletes
assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
}
cloudClient.deleteByQuery("*:*");
cloudClient.commit();
assertEquals(0, cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound());
server.shutdown();
// try using zookeeper with replication
String replicatedCollection = "replicated_collection";
createCollection(replicatedCollection, 2, 3, 2);
if (TEST_NIGHTLY) {
createCollection(replicatedCollection, 11, 3, 11);
} else {
createCollection(replicatedCollection, 2, 3, 2);
}
waitForRecoveriesToFinish(false);
cloudClient.setDefaultCollection(replicatedCollection);
fs.delete(inDir, true);
@ -490,7 +529,8 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
"--solr-home-dir=" + MINIMR_CONF_DIR.getAbsolutePath(),
"--output-dir=" + outDir.toString(),
"--mappers=3",
"--reducers=6",
"--reducers=22",
"--fanout=2",
"--verbose",
"--go-live",
"--zk-host", zkServer.getZkAddress(),
@ -505,15 +545,51 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
assertTrue(tool.job.isComplete());
assertTrue(tool.job.isSuccessful());
results = cloudClient.query(new SolrQuery("*:*"));
assertEquals(2104, results.getResults().getNumFound());
SolrDocumentList resultDocs = executeSolrQuery(cloudClient, "*:*");
assertEquals(RECORD_COUNT, resultDocs.getNumFound());
assertEquals(RECORD_COUNT, resultDocs.size());
checkConsistency(replicatedCollection);
}
// perform updates
for (int i = 0; i < RECORD_COUNT; i++) {
SolrDocument doc = resultDocs.get(i);
SolrInputDocument update = new SolrInputDocument();
for (Map.Entry<String, Object> entry : doc.entrySet()) {
update.setField(entry.getKey(), entry.getValue());
}
update.setField("user_screen_name", "@Nadja" + i);
update.removeField("_version_");
cloudClient.add(update);
}
cloudClient.commit();
// verify updates
SolrDocumentList resultDocs2 = executeSolrQuery(cloudClient, "*:*");
assertEquals(RECORD_COUNT, resultDocs2.getNumFound());
assertEquals(RECORD_COUNT, resultDocs2.size());
for (int i = 0; i < RECORD_COUNT; i++) {
SolrDocument doc = resultDocs.get(i);
SolrDocument doc2 = resultDocs2.get(i);
assertEquals(doc.getFieldValues("id"), doc2.getFieldValues("id"));
assertEquals(1, doc.getFieldValues("id").size());
assertEquals(Arrays.asList("@Nadja" + i), doc2.getFieldValues("user_screen_name"));
assertEquals(doc.getFieldValues("text"), doc2.getFieldValues("text"));
// perform delete
cloudClient.deleteById((String)doc.getFirstValue("id"));
}
cloudClient.commit();
// verify deletes
assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
}
// try using solr_url with replication
cloudClient.deleteByQuery("*:*");
cloudClient.commit();
assertEquals(0, executeSolrQuery(cloudClient, "*:*").getNumFound());
assertEquals(0, executeSolrQuery(cloudClient, "*:*").size());
fs.delete(inDir, true);
fs.delete(dataDir, true);
assertTrue(fs.mkdirs(dataDir));
@ -543,8 +619,7 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
checkConsistency(replicatedCollection);
results = cloudClient.query(new SolrQuery("*:*"));
assertEquals(2104, results.getResults().getNumFound());
assertEquals(RECORD_COUNT, executeSolrQuery(cloudClient, "*:*").size());
}
}
@ -555,6 +630,12 @@ public class MorphlineGoLiveMiniMRTest extends AbstractFullDistribZkTestBase {
args.add(cloudJettys.get(i).url);
}
}
private SolrDocumentList executeSolrQuery(SolrServer collection, String queryString) throws SolrServerException {
SolrQuery query = new SolrQuery(queryString).setRows(2 * RECORD_COUNT).addSort("id", ORDER.asc);
QueryResponse response = collection.query(query);
return response.getResults();
}
private void checkConsistency(String replicatedCollection)
throws SolrServerException {

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.lucene.util.Constants;
import org.apache.solr.common.SolrInputDocument;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@ -44,10 +45,17 @@ import com.google.common.collect.Lists;
public class MorphlineReducerTest extends MRUnitBase {
@BeforeClass
public static void beforeClass() {
public static void beforeClass2() {
assumeFalse("Does not work on Windows, because it uses UNIX shell commands or POSIX paths", Constants.WINDOWS);
assumeFalse("FIXME: This test fails under Java 8 due to the Saxon dependency - see SOLR-1301", Constants.JRE_IS_MINIMUM_JAVA8);
assumeFalse("FIXME: This test fails under J9 due to the Saxon dependency - see SOLR-1301", System.getProperty("java.vm.info", "<?>").contains("IBM J9"));
System.setProperty("verifyPartitionAssignment", "false");
}
@AfterClass
public static void afterClass2() {
System.clearProperty("verifyPartitionAssignment");
}
public static class MySolrReducer extends SolrReducer {
@ -89,28 +97,35 @@ public class MorphlineReducerTest extends MRUnitBase {
@Test
public void testReducer() throws Exception {
MySolrReducer myReducer = new MySolrReducer();
ReduceDriver<Text, SolrInputDocumentWritable, Text, SolrInputDocumentWritable> reduceDriver = ReduceDriver.newReduceDriver(myReducer);
Configuration config = reduceDriver.getConfiguration();
setupHadoopConfig(config);
List<SolrInputDocumentWritable> values = new ArrayList<SolrInputDocumentWritable>();
SolrInputDocument sid = new SolrInputDocument();
String id = "myid1";
sid.addField("id", id);
sid.addField("text", "some unique text");
SolrInputDocumentWritable sidw = new SolrInputDocumentWritable(sid);
values.add(sidw);
reduceDriver.withInput(new Text(id), values);
reduceDriver.withCacheArchive(solrHomeZip.getAbsolutePath());
reduceDriver.withOutputFormat(SolrOutputFormat.class, NullInputFormat.class);
reduceDriver.run();
assertEquals("Expected 1 counter increment", 1, reduceDriver.getCounters()
.findCounter(SolrCounters.class.getName(), SolrCounters.DOCUMENTS_WRITTEN.toString()).getValue());
try {
ReduceDriver<Text,SolrInputDocumentWritable,Text,SolrInputDocumentWritable> reduceDriver = ReduceDriver
.newReduceDriver(myReducer);
Configuration config = reduceDriver.getConfiguration();
setupHadoopConfig(config);
List<SolrInputDocumentWritable> values = new ArrayList<SolrInputDocumentWritable>();
SolrInputDocument sid = new SolrInputDocument();
String id = "myid1";
sid.addField("id", id);
sid.addField("text", "some unique text");
SolrInputDocumentWritable sidw = new SolrInputDocumentWritable(sid);
values.add(sidw);
reduceDriver.withInput(new Text(id), values);
reduceDriver.withCacheArchive(solrHomeZip.getAbsolutePath());
reduceDriver.withOutputFormat(SolrOutputFormat.class,
NullInputFormat.class);
reduceDriver.run();
assertEquals("Expected 1 counter increment", 1,
reduceDriver.getCounters().findCounter(SolrCounters.class.getName(),
SolrCounters.DOCUMENTS_WRITTEN.toString()).getValue());
} finally {
myReducer.cleanup(myReducer.context);
}
}
}