From b145b4b83ec203b07fddd4f6fedd0c3209a3ab91 Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Sat, 16 Jan 2010 00:02:10 +0000 Subject: [PATCH] HBASE-2136 Forward-port the old mapred package git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@899845 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/mapred/BuildTableIndex.java | 206 +++++++++ .../apache/hadoop/hbase/mapred/Driver.java | 40 ++ .../hadoop/hbase/mapred/GroupingTableMap.java | 162 +++++++ .../hbase/mapred/HRegionPartitioner.java | 91 ++++ .../hadoop/hbase/mapred/IdentityTableMap.java | 76 ++++ .../hbase/mapred/IdentityTableReduce.java | 61 +++ .../hbase/mapred/IndexConfiguration.java | 423 ++++++++++++++++++ .../hbase/mapred/IndexOutputFormat.java | 166 +++++++ .../hadoop/hbase/mapred/IndexTableReduce.java | 108 +++++ .../hbase/mapred/LuceneDocumentWrapper.java | 56 +++ .../hadoop/hbase/mapred/RowCounter.java | 137 ++++++ .../mapred/RowCounter_Counters.properties | 6 + .../hadoop/hbase/mapred/TableInputFormat.java | 83 ++++ .../hbase/mapred/TableInputFormatBase.java | 347 ++++++++++++++ .../apache/hadoop/hbase/mapred/TableMap.java | 39 ++ .../hbase/mapred/TableMapReduceUtil.java | 184 ++++++++ .../hbase/mapred/TableOutputFormat.java | 106 +++++ .../hadoop/hbase/mapred/TableReduce.java | 39 ++ .../hadoop/hbase/mapred/TableSplit.java | 113 +++++ .../hadoop/hbase/mapred/package-info.java | 128 ++++++ .../hbase/mapred/TestTableMapReduce.java | 244 ++++++++++ 22 files changed, 2816 insertions(+) create mode 100644 src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/Driver.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/RowCounter.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/RowCounter_Counters.properties create mode 100644 src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/TableMap.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/TableReduce.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/TableSplit.java create mode 100644 src/java/org/apache/hadoop/hbase/mapred/package-info.java create mode 100644 src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java diff --git a/CHANGES.txt b/CHANGES.txt index afdebad2f24..fc86a24f977 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -304,6 +304,7 @@ Release 0.21.0 - Unreleased HBASE-2107 Upgrading Lucene 2.2 to Lucene 3.0.0 (Kay Kay via Stack) HBASE-2111 Move to ivy broke our being able to run in-place; i.e. ./bin/start-hbase.sh in a checkout + HBASE-2136 Forward-port the old mapred package NEW FEATURES HBASE-1961 HBase EC2 scripts diff --git a/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java b/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java new file mode 100644 index 00000000000..cd66500a70c --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java @@ -0,0 +1,206 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; + +/** + * Example table column indexing class. Runs a mapreduce job to index + * specified table columns. + * + */ +@Deprecated +public class BuildTableIndex { + private static final String USAGE = "Usage: BuildTableIndex " + + "-m -r \n -indexConf " + + "-indexDir \n -table -columns " + + "[ ...]"; + + private static void printUsage(String message) { + System.err.println(message); + System.err.println(USAGE); + System.exit(-1); + } + + /** default constructor */ + public BuildTableIndex() { + super(); + } + + /** + * @param args + * @throws IOException + */ + public void run(String[] args) throws IOException { + if (args.length < 6) { + printUsage("Too few arguments"); + } + + int numMapTasks = 1; + int numReduceTasks = 1; + String iconfFile = null; + String indexDir = null; + String tableName = null; + StringBuffer columnNames = null; + + // parse args + for (int i = 0; i < args.length - 1; i++) { + if ("-m".equals(args[i])) { + numMapTasks = Integer.parseInt(args[++i]); + } else if ("-r".equals(args[i])) { + numReduceTasks = Integer.parseInt(args[++i]); + } else if ("-indexConf".equals(args[i])) { + iconfFile = args[++i]; + } else if ("-indexDir".equals(args[i])) { + indexDir = args[++i]; + } else if ("-table".equals(args[i])) { + tableName = args[++i]; + } else if ("-columns".equals(args[i])) { + columnNames = new StringBuffer(args[++i]); + while (i + 1 < args.length && !args[i + 1].startsWith("-")) { + columnNames.append(" "); + columnNames.append(args[++i]); + } + } else { + printUsage("Unsupported option " + args[i]); + } + } + + if (indexDir == null || tableName == null || columnNames == null) { + printUsage("Index directory, table name and at least one column must " + + "be specified"); + } + + Configuration conf = new HBaseConfiguration(); + if (iconfFile != null) { + // set index configuration content from a file + String content = readContent(iconfFile); + IndexConfiguration iconf = new IndexConfiguration(); + // purely to validate, exception will be thrown if not valid + iconf.addFromXML(content); + conf.set("hbase.index.conf", content); + } + + if (columnNames != null) { + JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir, + tableName, columnNames.toString()); + JobClient.runJob(jobConf); + } + } + + /** + * @param conf + * @param numMapTasks + * @param numReduceTasks + * @param indexDir + * @param tableName + * @param columnNames + * @return JobConf + */ + public JobConf createJob(Configuration conf, int numMapTasks, + int numReduceTasks, String indexDir, String tableName, + String columnNames) { + JobConf jobConf = new JobConf(conf, BuildTableIndex.class); + jobConf.setJobName("build index for table " + tableName); + jobConf.setNumMapTasks(numMapTasks); + // number of indexes to partition into + jobConf.setNumReduceTasks(numReduceTasks); + + // use identity map (a waste, but just as an example) + IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class, + jobConf); + + // use IndexTableReduce to build a Lucene index + jobConf.setReducerClass(IndexTableReduce.class); + FileOutputFormat.setOutputPath(jobConf, new Path(indexDir)); + jobConf.setOutputFormat(IndexOutputFormat.class); + + return jobConf; + } + + /* + * Read xml file of indexing configurations. The xml format is similar to + * hbase-default.xml and hadoop-default.xml. For an example configuration, + * see the createIndexConfContent method in TestTableIndex + * @param fileName File to read. + * @return XML configuration read from file + * @throws IOException + */ + private String readContent(String fileName) throws IOException { + File file = new File(fileName); + int length = (int) file.length(); + if (length == 0) { + printUsage("Index configuration file " + fileName + " does not exist"); + } + + int bytesRead = 0; + byte[] bytes = new byte[length]; + FileInputStream fis = new FileInputStream(file); + + try { + // read entire file into content + while (bytesRead < length) { + int read = fis.read(bytes, bytesRead, length - bytesRead); + if (read > 0) { + bytesRead += read; + } else { + break; + } + } + } finally { + fis.close(); + } + + return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING); + } + + /** + * @param args + * @throws IOException + */ + public static void main(String[] args) throws IOException { + BuildTableIndex build = new BuildTableIndex(); + build.run(args); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/Driver.java b/src/java/org/apache/hadoop/hbase/mapred/Driver.java new file mode 100644 index 00000000000..dcc40b178e6 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/Driver.java @@ -0,0 +1,40 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.util.ProgramDriver; + +/** + * Driver for hbase mapreduce jobs. Select which to run by passing + * name of job to this main. + */ +@Deprecated +public class Driver { + /** + * @param args + * @throws Throwable + */ + public static void main(String[] args) throws Throwable { + ProgramDriver pgd = new ProgramDriver(); + pgd.addClass(RowCounter.NAME, RowCounter.class, + "Count rows in HBase table"); + pgd.driver(args); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java b/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java new file mode 100644 index 00000000000..c368140e6b3 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java @@ -0,0 +1,162 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Map; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + + +/** + * Extract grouping columns from input record + */ +@Deprecated +public class GroupingTableMap +extends MapReduceBase +implements TableMap { + + /** + * JobConf parameter to specify the columns used to produce the key passed to + * collect from the map phase + */ + public static final String GROUP_COLUMNS = + "hbase.mapred.groupingtablemap.columns"; + + protected byte [][] columns; + + /** + * Use this before submitting a TableMap job. It will appropriately set up the + * JobConf. + * + * @param table table to be processed + * @param columns space separated list of columns to fetch + * @param groupColumns space separated list of columns used to form the key + * used in collect + * @param mapper map class + * @param job job configuration object + */ + @SuppressWarnings("unchecked") + public static void initJob(String table, String columns, String groupColumns, + Class mapper, JobConf job) { + + TableMapReduceUtil.initTableMapJob(table, columns, mapper, + ImmutableBytesWritable.class, Result.class, job); + job.set(GROUP_COLUMNS, groupColumns); + } + + @Override + public void configure(JobConf job) { + super.configure(job); + String[] cols = job.get(GROUP_COLUMNS, "").split(" "); + columns = new byte[cols.length][]; + for(int i = 0; i < cols.length; i++) { + columns[i] = Bytes.toBytes(cols[i]); + } + } + + /** + * Extract the grouping columns from value to construct a new key. + * + * Pass the new key and value to reduce. + * If any of the grouping columns are not found in the value, the record is skipped. + * @param key + * @param value + * @param output + * @param reporter + * @throws IOException + */ + public void map(ImmutableBytesWritable key, Result value, + OutputCollector output, + Reporter reporter) throws IOException { + + byte[][] keyVals = extractKeyValues(value); + if(keyVals != null) { + ImmutableBytesWritable tKey = createGroupKey(keyVals); + output.collect(tKey, value); + } + } + + /** + * Extract columns values from the current record. This method returns + * null if any of the columns are not found. + * + * Override this method if you want to deal with nulls differently. + * + * @param r + * @return array of byte values + */ + protected byte[][] extractKeyValues(Result r) { + byte[][] keyVals = null; + ArrayList foundList = new ArrayList(); + int numCols = columns.length; + if (numCols > 0) { + for (KeyValue value: r.list()) { + byte [] column = KeyValue.makeColumn(value.getFamily(), + value.getQualifier()); + for (int i = 0; i < numCols; i++) { + if (Bytes.equals(column, columns[i])) { + foundList.add(value.getValue()); + break; + } + } + } + if(foundList.size() == numCols) { + keyVals = foundList.toArray(new byte[numCols][]); + } + } + return keyVals; + } + + /** + * Create a key by concatenating multiple column values. + * Override this function in order to produce different types of keys. + * + * @param vals + * @return key generated by concatenating multiple column values + */ + protected ImmutableBytesWritable createGroupKey(byte[][] vals) { + if(vals == null) { + return null; + } + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < vals.length; i++) { + if(i > 0) { + sb.append(" "); + } + try { + sb.append(new String(vals[i], HConstants.UTF8_ENCODING)); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + return new ImmutableBytesWritable(Bytes.toBytes(sb.toString())); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java b/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java new file mode 100644 index 00000000000..8f264541b2c --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java @@ -0,0 +1,91 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; + + +/** + * This is used to partition the output keys into groups of keys. + * Keys are grouped according to the regions that currently exist + * so that each reducer fills a single region so load is distributed. + * + * @param + * @param + */ +@Deprecated +public class HRegionPartitioner +implements Partitioner { + private final Log LOG = LogFactory.getLog(TableInputFormat.class); + private HTable table; + private byte[][] startKeys; + + public void configure(JobConf job) { + try { + this.table = new HTable(new HBaseConfiguration(job), + job.get(TableOutputFormat.OUTPUT_TABLE)); + } catch (IOException e) { + LOG.error(e); + } + + try { + this.startKeys = this.table.getStartKeys(); + } catch (IOException e) { + LOG.error(e); + } + } + + public int getPartition(ImmutableBytesWritable key, + V2 value, int numPartitions) { + byte[] region = null; + // Only one region return 0 + if (this.startKeys.length == 1){ + return 0; + } + try { + // Not sure if this is cached after a split so we could have problems + // here if a region splits while mapping + region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey(); + } catch (IOException e) { + LOG.error(e); + } + for (int i = 0; i < this.startKeys.length; i++){ + if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ + if (i >= numPartitions-1){ + // cover if we have less reduces then regions. + return (Integer.toString(i).hashCode() + & Integer.MAX_VALUE) % numPartitions; + } + return i; + } + } + // if above fails to find start key that match we need to return something + return 0; + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java new file mode 100644 index 00000000000..0f67a9e7626 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java @@ -0,0 +1,76 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +/** + * Pass the given key and record as-is to reduce + */ +@Deprecated +public class IdentityTableMap +extends MapReduceBase +implements TableMap { + + /** constructor */ + public IdentityTableMap() { + super(); + } + + /** + * Use this before submitting a TableMap job. It will + * appropriately set up the JobConf. + * + * @param table table name + * @param columns columns to scan + * @param mapper mapper class + * @param job job configuration + */ + @SuppressWarnings("unchecked") + public static void initJob(String table, String columns, + Class mapper, JobConf job) { + TableMapReduceUtil.initTableMapJob(table, columns, mapper, + ImmutableBytesWritable.class, + Result.class, job); + } + + /** + * Pass the key, value to reduce + * @param key + * @param value + * @param output + * @param reporter + * @throws IOException + */ + public void map(ImmutableBytesWritable key, Result value, + OutputCollector output, + Reporter reporter) throws IOException { + + // convert + output.collect(key, value); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java new file mode 100644 index 00000000000..be0a6bd8089 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java @@ -0,0 +1,61 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +/** + * Write to table each key, record pair + */ +@Deprecated +public class IdentityTableReduce +extends MapReduceBase +implements TableReduce { + @SuppressWarnings("unused") + private static final Log LOG = + LogFactory.getLog(IdentityTableReduce.class.getName()); + + /** + * No aggregation, output pairs of (key, record) + * @param key + * @param values + * @param output + * @param reporter + * @throws IOException + */ + public void reduce(ImmutableBytesWritable key, Iterator values, + OutputCollector output, + Reporter reporter) + throws IOException { + + while(values.hasNext()) { + output.collect(key, values.next()); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java b/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java new file mode 100644 index 00000000000..df8ff290324 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java @@ -0,0 +1,423 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.ByteArrayInputStream; +import java.io.OutputStream; +import java.io.StringWriter; +import java.util.concurrent.ConcurrentHashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.w3c.dom.Text; + +/** + * Configuration parameters for building a Lucene index + */ +@Deprecated +public class IndexConfiguration extends Configuration { + private static final Log LOG = LogFactory.getLog(IndexConfiguration.class); + + static final String HBASE_COLUMN_NAME = "hbase.column.name"; + static final String HBASE_COLUMN_STORE = "hbase.column.store"; + static final String HBASE_COLUMN_INDEX = "hbase.column.index"; + static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize"; + static final String HBASE_COLUMN_BOOST = "hbase.column.boost"; + static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms"; + static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name"; + static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name"; + static final String HBASE_INDEX_MAX_BUFFERED_DOCS = + "hbase.index.max.buffered.docs"; + static final String HBASE_INDEX_MAX_BUFFERED_DELS = + "hbase.index.max.buffered.dels"; + static final String HBASE_INDEX_MAX_FIELD_LENGTH = + "hbase.index.max.field.length"; + static final String HBASE_INDEX_MAX_MERGE_DOCS = + "hbase.index.max.merge.docs"; + static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor"; + // double ramBufferSizeMB; + static final String HBASE_INDEX_SIMILARITY_NAME = + "hbase.index.similarity.name"; + static final String HBASE_INDEX_USE_COMPOUND_FILE = + "hbase.index.use.compound.file"; + static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize"; + + public static class ColumnConf extends Properties { + + private static final long serialVersionUID = 7419012290580607821L; + + boolean getBoolean(String name, boolean defaultValue) { + String valueString = getProperty(name); + if ("true".equals(valueString)) + return true; + else if ("false".equals(valueString)) + return false; + else + return defaultValue; + } + + void setBoolean(String name, boolean value) { + setProperty(name, Boolean.toString(value)); + } + + float getFloat(String name, float defaultValue) { + String valueString = getProperty(name); + if (valueString == null) + return defaultValue; + try { + return Float.parseFloat(valueString); + } catch (NumberFormatException e) { + return defaultValue; + } + } + + void setFloat(String name, float value) { + setProperty(name, Float.toString(value)); + } + } + + private Map columnMap = + new ConcurrentHashMap(); + + public Iterator columnNameIterator() { + return columnMap.keySet().iterator(); + } + + public boolean isIndex(String columnName) { + return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true); + } + + public void setIndex(String columnName, boolean index) { + getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index); + } + + public boolean isStore(String columnName) { + return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false); + } + + public void setStore(String columnName, boolean store) { + getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store); + } + + public boolean isTokenize(String columnName) { + return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true); + } + + public void setTokenize(String columnName, boolean tokenize) { + getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize); + } + + public float getBoost(String columnName) { + return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f); + } + + public void setBoost(String columnName, float boost) { + getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost); + } + + public boolean isOmitNorms(String columnName) { + return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true); + } + + public void setOmitNorms(String columnName, boolean omitNorms) { + getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms); + } + + private ColumnConf getColumn(String columnName) { + ColumnConf column = columnMap.get(columnName); + if (column == null) { + column = new ColumnConf(); + columnMap.put(columnName, column); + } + return column; + } + + public String getAnalyzerName() { + return get(HBASE_INDEX_ANALYZER_NAME, + "org.apache.lucene.analysis.standard.StandardAnalyzer"); + } + + public void setAnalyzerName(String analyzerName) { + set(HBASE_INDEX_ANALYZER_NAME, analyzerName); + } + + public int getMaxBufferedDeleteTerms() { + return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000); + } + + public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { + setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms); + } + + public int getMaxBufferedDocs() { + return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10); + } + + public void setMaxBufferedDocs(int maxBufferedDocs) { + setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs); + } + + public int getMaxFieldLength() { + return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE); + } + + public void setMaxFieldLength(int maxFieldLength) { + setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength); + } + + public int getMaxMergeDocs() { + return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE); + } + + public void setMaxMergeDocs(int maxMergeDocs) { + setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs); + } + + public int getMergeFactor() { + return getInt(HBASE_INDEX_MERGE_FACTOR, 10); + } + + public void setMergeFactor(int mergeFactor) { + setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor); + } + + public String getRowkeyName() { + return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY"); + } + + public void setRowkeyName(String rowkeyName) { + set(HBASE_INDEX_ROWKEY_NAME, rowkeyName); + } + + public String getSimilarityName() { + return get(HBASE_INDEX_SIMILARITY_NAME, null); + } + + public void setSimilarityName(String similarityName) { + set(HBASE_INDEX_SIMILARITY_NAME, similarityName); + } + + public boolean isUseCompoundFile() { + return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false); + } + + public void setUseCompoundFile(boolean useCompoundFile) { + setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile); + } + + public boolean doOptimize() { + return getBoolean(HBASE_INDEX_OPTIMIZE, true); + } + + public void setDoOptimize(boolean doOptimize) { + setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize); + } + + public void addFromXML(String content) { + try { + DocumentBuilder builder = DocumentBuilderFactory.newInstance() + .newDocumentBuilder(); + + Document doc = builder + .parse(new ByteArrayInputStream(content.getBytes())); + + Element root = doc.getDocumentElement(); + if (!"configuration".equals(root.getTagName())) { + LOG.fatal("bad conf file: top-level element not "); + } + + NodeList props = root.getChildNodes(); + for (int i = 0; i < props.getLength(); i++) { + Node propNode = props.item(i); + if (!(propNode instanceof Element)) { + continue; + } + + Element prop = (Element) propNode; + if ("property".equals(prop.getTagName())) { + propertyFromXML(prop, null); + } else if ("column".equals(prop.getTagName())) { + columnConfFromXML(prop); + } else { + LOG.warn("bad conf content: element neither nor "); + } + } + } catch (Exception e) { + LOG.fatal("error parsing conf content: " + e); + throw new RuntimeException(e); + } + } + + private void propertyFromXML(Element prop, Properties properties) { + NodeList fields = prop.getChildNodes(); + String attr = null; + String value = null; + + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) { + continue; + } + + Element field = (Element) fieldNode; + if ("name".equals(field.getTagName())) { + attr = ((Text) field.getFirstChild()).getData(); + } + if ("value".equals(field.getTagName()) && field.hasChildNodes()) { + value = ((Text) field.getFirstChild()).getData(); + } + } + + if (attr != null && value != null) { + if (properties == null) { + set(attr, value); + } else { + properties.setProperty(attr, value); + } + } + } + + private void columnConfFromXML(Element column) { + ColumnConf columnConf = new ColumnConf(); + NodeList props = column.getChildNodes(); + for (int i = 0; i < props.getLength(); i++) { + Node propNode = props.item(i); + if (!(propNode instanceof Element)) { + continue; + } + + Element prop = (Element) propNode; + if ("property".equals(prop.getTagName())) { + propertyFromXML(prop, columnConf); + } else { + LOG.warn("bad conf content: element not "); + } + } + + if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) { + columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf); + } else { + LOG.warn("bad column conf: name not specified"); + } + } + + public void write(OutputStream out) { + try { + Document doc = writeDocument(); + DOMSource source = new DOMSource(doc); + StreamResult result = new StreamResult(out); + TransformerFactory transFactory = TransformerFactory.newInstance(); + Transformer transformer = transFactory.newTransformer(); + transformer.transform(source, result); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Document writeDocument() { + Iterator> iter = iterator(); + try { + Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder() + .newDocument(); + Element conf = doc.createElement("configuration"); + doc.appendChild(conf); + conf.appendChild(doc.createTextNode("\n")); + + Map.Entry entry; + while (iter.hasNext()) { + entry = iter.next(); + String name = entry.getKey(); + String value = entry.getValue(); + writeProperty(doc, conf, name, value); + } + + Iterator columnIter = columnNameIterator(); + while (columnIter.hasNext()) { + writeColumn(doc, conf, columnIter.next()); + } + + return doc; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void writeProperty(Document doc, Element parent, String name, + String value) { + Element propNode = doc.createElement("property"); + parent.appendChild(propNode); + + Element nameNode = doc.createElement("name"); + nameNode.appendChild(doc.createTextNode(name)); + propNode.appendChild(nameNode); + + Element valueNode = doc.createElement("value"); + valueNode.appendChild(doc.createTextNode(value)); + propNode.appendChild(valueNode); + + parent.appendChild(doc.createTextNode("\n")); + } + + private void writeColumn(Document doc, Element parent, String columnName) { + Element column = doc.createElement("column"); + parent.appendChild(column); + column.appendChild(doc.createTextNode("\n")); + + ColumnConf columnConf = getColumn(columnName); + for (Map.Entry entry : columnConf.entrySet()) { + if (entry.getKey() instanceof String + && entry.getValue() instanceof String) { + writeProperty(doc, column, (String) entry.getKey(), (String) entry + .getValue()); + } + } + } + + @Override + public String toString() { + StringWriter writer = new StringWriter(); + try { + Document doc = writeDocument(); + DOMSource source = new DOMSource(doc); + StreamResult result = new StreamResult(writer); + TransformerFactory transFactory = TransformerFactory.newInstance(); + Transformer transformer = transFactory.newTransformer(); + transformer.transform(source, result); + } catch (Exception e) { + throw new RuntimeException(e); + } + return writer.toString(); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java b/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java new file mode 100644 index 00000000000..f970f9b51ae --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java @@ -0,0 +1,166 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.io.File; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.search.Similarity; +import org.apache.lucene.store.FSDirectory; + +/** + * Create a local index, unwrap Lucene documents created by reduce, add them to + * the index, and copy the index to the destination. + */ +@Deprecated +public class IndexOutputFormat extends + FileOutputFormat { + static final Log LOG = LogFactory.getLog(IndexOutputFormat.class); + + private Random random = new Random(); + + @Override + public RecordWriter + getRecordWriter(final FileSystem fs, JobConf job, String name, + final Progressable progress) + throws IOException { + + final Path perm = new Path(FileOutputFormat.getOutputPath(job), name); + final Path temp = job.getLocalPath("index/_" + + Integer.toString(random.nextInt())); + + LOG.info("To index into " + perm); + + // delete old, if any + fs.delete(perm, true); + + final IndexConfiguration indexConf = new IndexConfiguration(); + String content = job.get("hbase.index.conf"); + if (content != null) { + indexConf.addFromXML(content); + } + + String analyzerName = indexConf.getAnalyzerName(); + Analyzer analyzer; + try { + Class analyzerClass = Class.forName(analyzerName); + analyzer = (Analyzer) analyzerClass.newInstance(); + } catch (Exception e) { + throw new IOException("Error in creating an analyzer object " + + analyzerName); + } + + // build locally first + final IndexWriter writer = new IndexWriter(FSDirectory.open(new File(fs.startLocalOutput(perm, temp) + .toString())), analyzer, true, IndexWriter.MaxFieldLength.LIMITED); + + // no delete, so no need for maxBufferedDeleteTerms + writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs()); + writer.setMaxFieldLength(indexConf.getMaxFieldLength()); + writer.setMaxMergeDocs(indexConf.getMaxMergeDocs()); + writer.setMergeFactor(indexConf.getMergeFactor()); + String similarityName = indexConf.getSimilarityName(); + if (similarityName != null) { + try { + Class similarityClass = Class.forName(similarityName); + Similarity similarity = (Similarity) similarityClass.newInstance(); + writer.setSimilarity(similarity); + } catch (Exception e) { + throw new IOException("Error in creating a similarty object " + + similarityName); + } + } + writer.setUseCompoundFile(indexConf.isUseCompoundFile()); + + return new RecordWriter() { + boolean closed; + private long docCount = 0; + + public void write(ImmutableBytesWritable key, + LuceneDocumentWrapper value) + throws IOException { + // unwrap and index doc + Document doc = value.get(); + writer.addDocument(doc); + docCount++; + progress.progress(); + } + + public void close(final Reporter reporter) throws IOException { + // spawn a thread to give progress heartbeats + Thread prog = new Thread() { + @Override + public void run() { + while (!closed) { + try { + reporter.setStatus("closing"); + Thread.sleep(1000); + } catch (InterruptedException e) { + continue; + } catch (Throwable e) { + return; + } + } + } + }; + + try { + prog.start(); + + // optimize index + if (indexConf.doOptimize()) { + if (LOG.isInfoEnabled()) { + LOG.info("Optimizing index."); + } + writer.optimize(); + } + + // close index + writer.close(); + if (LOG.isInfoEnabled()) { + LOG.info("Done indexing " + docCount + " docs."); + } + + // copy to perm destination in dfs + fs.completeLocalOutput(perm, temp); + if (LOG.isInfoEnabled()) { + LOG.info("Copy done."); + } + } finally { + closed = true; + } + } + }; + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java b/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java new file mode 100644 index 00000000000..7869cab6bfd --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java @@ -0,0 +1,108 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Construct a Lucene document per row, which is consumed by IndexOutputFormat + * to build a Lucene index + */ +@Deprecated +public class IndexTableReduce extends MapReduceBase implements + Reducer { + private static final Log LOG = LogFactory.getLog(IndexTableReduce.class); + private IndexConfiguration indexConf; + + @Override + public void configure(JobConf job) { + super.configure(job); + indexConf = new IndexConfiguration(); + String content = job.get("hbase.index.conf"); + if (content != null) { + indexConf.addFromXML(content); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Index conf: " + indexConf); + } + } + + @Override + public void close() throws IOException { + super.close(); + } + + public void reduce(ImmutableBytesWritable key, Iterator values, + OutputCollector output, + Reporter reporter) + throws IOException { + Document doc = null; + while(values.hasNext()) { + Result r = values.next(); + if (doc == null) { + doc = new Document(); + // index and store row key, row key already UTF-8 encoded + Field keyField = new Field(indexConf.getRowkeyName(), + Bytes.toString(key.get(), key.getOffset(), key.getLength()), + Field.Store.YES, Field.Index.NOT_ANALYZED); + keyField.setOmitNorms(true); + doc.add(keyField); + } + // each column (name-value pair) is a field (name-value pair) + for (KeyValue kv: r.list()) { + // name is already UTF-8 encoded + String column = Bytes.toString(KeyValue.makeColumn(kv.getFamily(), + kv.getQualifier())); + byte[] columnValue = kv.getValue(); + Field.Store store = indexConf.isStore(column)? + Field.Store.YES: Field.Store.NO; + Field.Index index = indexConf.isIndex(column)? + (indexConf.isTokenize(column)? + Field.Index.ANALYZED: Field.Index.NOT_ANALYZED): + Field.Index.NO; + + // UTF-8 encode value + Field field = new Field(column, Bytes.toString(columnValue), + store, index); + field.setBoost(indexConf.getBoost(column)); + field.setOmitNorms(indexConf.isOmitNorms(column)); + + doc.add(field); + } + } + output.collect(key, new LuceneDocumentWrapper(doc)); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java b/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java new file mode 100644 index 00000000000..5d29d6a2d2a --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java @@ -0,0 +1,56 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import org.apache.hadoop.io.Writable; +import org.apache.lucene.document.Document; + +/** + * A utility class used to pass a lucene document from reduce to OutputFormat. + * It doesn't really serialize/deserialize a lucene document. + */ +@Deprecated +public class LuceneDocumentWrapper implements Writable { + protected Document doc; + + /** + * @param doc + */ + public LuceneDocumentWrapper(Document doc) { + this.doc = doc; + } + + /** + * @return the document + */ + public Document get() { + return doc; + } + + public void readFields(DataInput in) { + // intentionally left blank + } + + public void write(DataOutput out) { + // intentionally left blank + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java b/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java new file mode 100644 index 00000000000..f24d283d0b3 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/RowCounter.java @@ -0,0 +1,137 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A job with a map to count rows. + * Map outputs table rows IF the input row has columns that have content. + * Uses an {@link IdentityReducer} + */ +@Deprecated +public class RowCounter extends Configured implements Tool { + // Name of this 'program' + static final String NAME = "rowcounter"; + + /** + * Mapper that runs the count. + */ + static class RowCounterMapper + implements TableMap { + private static enum Counters {ROWS} + + public void map(ImmutableBytesWritable row, Result values, + OutputCollector output, + Reporter reporter) + throws IOException { + boolean content = false; + + for (KeyValue value: values.list()) { + if (value.getValue().length > 0) { + content = true; + break; + } + } + if (!content) { + // Don't count rows that are all empty values. + return; + } + // Give out same value every time. We're only interested in the row/key + reporter.incrCounter(Counters.ROWS, 1); + } + + public void configure(JobConf jc) { + // Nothing to do. + } + + public void close() throws IOException { + // Nothing to do. + } + } + + /** + * @param args + * @return the JobConf + * @throws IOException + */ + public JobConf createSubmittableJob(String[] args) throws IOException { + JobConf c = new JobConf(getConf(), getClass()); + c.setJobName(NAME); + // Columns are space delimited + StringBuilder sb = new StringBuilder(); + final int columnoffset = 2; + for (int i = columnoffset; i < args.length; i++) { + if (i > columnoffset) { + sb.append(" "); + } + sb.append(args[i]); + } + // Second argument is the table name. + TableMapReduceUtil.initTableMapJob(args[1], sb.toString(), + RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c); + c.setNumReduceTasks(0); + // First arg is the output directory. + FileOutputFormat.setOutputPath(c, new Path(args[0])); + return c; + } + + static int printUsage() { + System.out.println(NAME + + " [...]"); + return -1; + } + + public int run(final String[] args) throws Exception { + // Make sure there are at least 3 parameters + if (args.length < 3) { + System.err.println("ERROR: Wrong number of parameters: " + args.length); + return printUsage(); + } + JobClient.runJob(createSubmittableJob(args)); + return 0; + } + + /** + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + HBaseConfiguration c = new HBaseConfiguration(); + int errCode = ToolRunner.run(c, new RowCounter(), args); + System.exit(errCode); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/RowCounter_Counters.properties b/src/java/org/apache/hadoop/hbase/mapred/RowCounter_Counters.properties new file mode 100644 index 00000000000..02a978e4ae2 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/RowCounter_Counters.properties @@ -0,0 +1,6 @@ + +# ResourceBundle properties file for RowCounter MR job + +CounterGroupName= RowCounter + +ROWS.name= Rows \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java new file mode 100644 index 00000000000..1220a09fa48 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java @@ -0,0 +1,83 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.util.StringUtils; + +/** + * Convert HBase tabular data into a format that is consumable by Map/Reduce. + */ +@Deprecated +public class TableInputFormat extends TableInputFormatBase implements + JobConfigurable { + private final Log LOG = LogFactory.getLog(TableInputFormat.class); + + /** + * space delimited list of columns + */ + public static final String COLUMN_LIST = "hbase.mapred.tablecolumns"; + + public void configure(JobConf job) { + Path[] tableNames = FileInputFormat.getInputPaths(job); + String colArg = job.get(COLUMN_LIST); + String[] colNames = colArg.split(" "); + byte [][] m_cols = new byte[colNames.length][]; + for (int i = 0; i < m_cols.length; i++) { + m_cols[i] = Bytes.toBytes(colNames[i]); + } + setInputColumns(m_cols); + try { + setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName())); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + + public void validateInput(JobConf job) throws IOException { + // expecting exactly one path + Path [] tableNames = FileInputFormat.getInputPaths(job); + if (tableNames == null || tableNames.length > 1) { + throw new IOException("expecting one table name"); + } + + // connected to table? + if (getHTable() == null) { + throw new IOException("could not connect to table '" + + tableNames[0].getName() + "'"); + } + + // expecting at least one column + String colArg = job.get(COLUMN_LIST); + if (colArg == null || colArg.length() == 0) { + throw new IOException("expecting at least one column"); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java new file mode 100644 index 00000000000..f5eca94ad73 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java @@ -0,0 +1,347 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; + +/** + * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a + * byte[] of input columns and optionally a {@link Filter}. + * Subclasses may use other TableRecordReader implementations. + *

+ * An example of a subclass: + *

+ *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
+ *
+ *     public void configure(JobConf job) {
+ *       HTable exampleTable = new HTable(new HBaseConfiguration(job),
+ *         Bytes.toBytes("exampleTable"));
+ *       // mandatory
+ *       setHTable(exampleTable);
+ *       Text[] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ *         Bytes.toBytes("columnB") };
+ *       // mandatory
+ *       setInputColumns(inputColumns);
+ *       RowFilterInterface exampleFilter = new RegExpRowFilter("keyPrefix.*");
+ *       // optional
+ *       setRowFilter(exampleFilter);
+ *     }
+ *
+ *     public void validateInput(JobConf job) throws IOException {
+ *     }
+ *  }
+ * 
+ */ + +@Deprecated +public abstract class TableInputFormatBase +implements InputFormat { + final Log LOG = LogFactory.getLog(TableInputFormatBase.class); + private byte [][] inputColumns; + private HTable table; + private TableRecordReader tableRecordReader; + private Filter rowFilter; + + /** + * Iterate over an HBase table data, return (Text, RowResult) pairs + */ + protected class TableRecordReader + implements RecordReader { + private byte [] startRow; + private byte [] endRow; + private byte [] lastRow; + private Filter trrRowFilter; + private ResultScanner scanner; + private HTable htable; + private byte [][] trrInputColumns; + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow + * @throws IOException + */ + public void restart(byte[] firstRow) throws IOException { + if ((endRow != null) && (endRow.length > 0)) { + if (trrRowFilter != null) { + Scan scan = new Scan(firstRow, endRow); + scan.addColumns(trrInputColumns); + scan.setFilter(trrRowFilter); + this.scanner = this.htable.getScanner(scan); + } else { + LOG.debug("TIFB.restart, firstRow: " + + Bytes.toStringBinary(firstRow) + ", endRow: " + + Bytes.toStringBinary(endRow)); + Scan scan = new Scan(firstRow, endRow); + scan.addColumns(trrInputColumns); + this.scanner = this.htable.getScanner(scan); + } + } else { + LOG.debug("TIFB.restart, firstRow: " + + Bytes.toStringBinary(firstRow) + ", no endRow"); + + Scan scan = new Scan(firstRow); + scan.addColumns(trrInputColumns); +// scan.setFilter(trrRowFilter); + this.scanner = this.htable.getScanner(scan); + } + } + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + restart(startRow); + } + + /** + * @param htable the {@link HTable} to scan. + */ + public void setHTable(HTable htable) { + this.htable = htable; + } + + /** + * @param inputColumns the columns to be placed in {@link Result}. + */ + public void setInputColumns(final byte [][] inputColumns) { + this.trrInputColumns = inputColumns; + } + + /** + * @param startRow the first row in the split + */ + public void setStartRow(final byte [] startRow) { + this.startRow = startRow; + } + + /** + * + * @param endRow the last row in the split + */ + public void setEndRow(final byte [] endRow) { + this.endRow = endRow; + } + + /** + * @param rowFilter the {@link Filter} to be used. + */ + public void setRowFilter(Filter rowFilter) { + this.trrRowFilter = rowFilter; + } + + public void close() { + this.scanner.close(); + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + /** + * @return RowResult + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public Result createValue() { + return new Result(); + } + + public long getPos() { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + public float getProgress() { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * @param key HStoreKey as input key. + * @param value MapWritable as input value + * @return true if there was more data + * @throws IOException + */ + public boolean next(ImmutableBytesWritable key, Result value) + throws IOException { + Result result; + try { + result = this.scanner.next(); + } catch (UnknownScannerException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + restart(lastRow); + this.scanner.next(); // skip presumed already mapped row + result = this.scanner.next(); + } + + if (result != null && result.size() > 0) { + key.set(result.getRow()); + lastRow = key.get(); + Writables.copyWritable(result, value); + return true; + } + return false; + } + } + + /** + * Builds a TableRecordReader. If no TableRecordReader was provided, uses + * the default. + * + * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, + * JobConf, Reporter) + */ + public RecordReader getRecordReader( + InputSplit split, JobConf job, Reporter reporter) + throws IOException { + TableSplit tSplit = (TableSplit) split; + TableRecordReader trr = this.tableRecordReader; + // if no table record reader was provided use default + if (trr == null) { + trr = new TableRecordReader(); + } + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + trr.init(); + return trr; + } + + /** + * Calculates the splits that will serve as input for the map tasks. + *
    + * Splits are created in number equal to the smallest between numSplits and + * the number of {@link HRegion}s in the table. If the number of splits is + * smaller than the number of {@link HRegion}s then splits are spanned across + * multiple {@link HRegion}s and are grouped the most evenly possible. In the + * case splits are uneven the bigger splits are placed first in the + * {@link InputSplit} array. + * + * @param job the map task {@link JobConf} + * @param numSplits a hint to calculate the number of splits (mapred.map.tasks). + * + * @return the input splits + * + * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int) + */ + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + if (this.table == null) { + throw new IOException("No table was provided"); + } + byte [][] startKeys = this.table.getStartKeys(); + if (startKeys == null || startKeys.length == 0) { + throw new IOException("Expecting at least one region"); + } + if (this.inputColumns == null || this.inputColumns.length == 0) { + throw new IOException("Expecting at least one column"); + } + int realNumSplits = numSplits > startKeys.length? startKeys.length: + numSplits; + InputSplit[] splits = new InputSplit[realNumSplits]; + int middle = startKeys.length / realNumSplits; + int startPos = 0; + for (int i = 0; i < realNumSplits; i++) { + int lastPos = startPos + middle; + lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; + String regionLocation = table.getRegionLocation(startKeys[startPos]). + getServerAddress().getHostname(); + splits[i] = new TableSplit(this.table.getTableName(), + startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]: + HConstants.EMPTY_START_ROW, regionLocation); + LOG.info("split: " + i + "->" + splits[i]); + startPos = lastPos; + } + return splits; + } + + /** + * @param inputColumns to be passed in {@link Result} to the map task. + */ + protected void setInputColumns(byte [][] inputColumns) { + this.inputColumns = inputColumns; + } + + /** + * Allows subclasses to get the {@link HTable}. + */ + protected HTable getHTable() { + return this.table; + } + + /** + * Allows subclasses to set the {@link HTable}. + * + * @param table to get the data from + */ + protected void setHTable(HTable table) { + this.table = table; + } + + /** + * Allows subclasses to set the {@link TableRecordReader}. + * + * @param tableRecordReader + * to provide other {@link TableRecordReader} implementations. + */ + protected void setTableRecordReader(TableRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } + + /** + * Allows subclasses to set the {@link Filter} to be used. + * + * @param rowFilter + */ + protected void setRowFilter(Filter rowFilter) { + this.rowFilter = rowFilter; + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableMap.java b/src/java/org/apache/hadoop/hbase/mapred/TableMap.java new file mode 100644 index 00000000000..597f3efc4a9 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableMap.java @@ -0,0 +1,39 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Mapper; + +/** + * Scan an HBase table to sort by a specified sort column. + * If the column does not exist, the record is not passed to Reduce. + * + * @param WritableComparable key class + * @param Writable value class + */ +@Deprecated +public interface TableMap, V extends Writable> +extends Mapper { + +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java new file mode 100644 index 00000000000..41748fe6de3 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -0,0 +1,184 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; + +/** + * Utility for {@link TableMap} and {@link TableReduce} + */ +@Deprecated +@SuppressWarnings("unchecked") +public class TableMapReduceUtil { + + /** + * Use this before submitting a TableMap job. It will + * appropriately set up the JobConf. + * + * @param table The table name to read from. + * @param columns The columns to scan. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job configuration to adjust. + */ + public static void initTableMapJob(String table, String columns, + Class mapper, + Class outputKeyClass, + Class outputValueClass, JobConf job) { + + job.setInputFormat(TableInputFormat.class); + job.setMapOutputValueClass(outputValueClass); + job.setMapOutputKeyClass(outputKeyClass); + job.setMapperClass(mapper); + FileInputFormat.addInputPaths(job, table); + job.set(TableInputFormat.COLUMN_LIST, columns); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job configuration to adjust. + * @throws IOException When determining the region count fails. + */ + public static void initTableReduceJob(String table, + Class reducer, JobConf job) + throws IOException { + initTableReduceJob(table, reducer, job, null); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job configuration to adjust. + * @param partitioner Partitioner to use. Pass null to use + * default partitioner. + * @throws IOException When determining the region count fails. + */ + public static void initTableReduceJob(String table, + Class reducer, JobConf job, Class partitioner) + throws IOException { + job.setOutputFormat(TableOutputFormat.class); + job.setReducerClass(reducer); + job.set(TableOutputFormat.OUTPUT_TABLE, table); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Put.class); + if (partitioner == HRegionPartitioner.class) { + job.setPartitionerClass(HRegionPartitioner.class); + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + if (job.getNumReduceTasks() > regions) { + job.setNumReduceTasks(outputTable.getRegionsInfo().size()); + } + } else if (partitioner != null) { + job.setPartitionerClass(partitioner); + } + } + + /** + * Ensures that the given number of reduce tasks for the given job + * configuration does not exceed the number of regions for the given table. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + public static void limitNumReduceTasks(String table, JobConf job) + throws IOException { + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + if (job.getNumReduceTasks() > regions) + job.setNumReduceTasks(regions); + } + + /** + * Ensures that the given number of map tasks for the given job + * configuration does not exceed the number of regions for the given table. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + public static void limitNumMapTasks(String table, JobConf job) + throws IOException { + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + if (job.getNumMapTasks() > regions) + job.setNumMapTasks(regions); + } + + /** + * Sets the number of reduce tasks for the given job configuration to the + * number of regions the given table has. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + public static void setNumReduceTasks(String table, JobConf job) + throws IOException { + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + job.setNumReduceTasks(regions); + } + + /** + * Sets the number of map tasks for the given job configuration to the + * number of regions the given table has. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + public static void setNumMapTasks(String table, JobConf job) + throws IOException { + HTable outputTable = new HTable(new HBaseConfiguration(job), table); + int regions = outputTable.getRegionsInfo().size(); + job.setNumMapTasks(regions); + } + + /** + * Sets the number of rows to return and cache with each scanner iteration. + * Higher caching values will enable faster mapreduce jobs at the expense of + * requiring more heap to contain the cached rows. + * + * @param job The current job configuration to adjust. + * @param batchSize The number of rows to return in batch with each scanner + * iteration. + */ + public static void setScannerCaching(JobConf job, int batchSize) { + job.setInt("hbase.client.scanner.caching", batchSize); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java new file mode 100644 index 00000000000..159b78e1b87 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java @@ -0,0 +1,106 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; + +/** + * Convert Map/Reduce output and write it to an HBase table + */ +@Deprecated +public class TableOutputFormat extends +FileOutputFormat { + + /** JobConf parameter that specifies the output table */ + public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; + private final Log LOG = LogFactory.getLog(TableOutputFormat.class); + + /** + * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) + * and write to an HBase table + */ + protected static class TableRecordWriter + implements RecordWriter { + private HTable m_table; + + /** + * Instantiate a TableRecordWriter with the HBase HClient for writing. + * + * @param table + */ + public TableRecordWriter(HTable table) { + m_table = table; + } + + public void close(Reporter reporter) + throws IOException { + m_table.flushCommits(); + } + + public void write(ImmutableBytesWritable key, + Put value) throws IOException { + m_table.put(new Put(value)); + } + } + + @Override + @SuppressWarnings("unchecked") + public RecordWriter getRecordWriter(FileSystem ignored, + JobConf job, String name, Progressable progress) throws IOException { + + // expecting exactly one path + + String tableName = job.get(OUTPUT_TABLE); + HTable table = null; + try { + table = new HTable(new HBaseConfiguration(job), tableName); + } catch(IOException e) { + LOG.error(e); + throw e; + } + table.setAutoFlush(false); + return new TableRecordWriter(table); + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) + throws FileAlreadyExistsException, InvalidJobConfException, IOException { + + String tableName = job.get(OUTPUT_TABLE); + if(tableName == null) { + throw new IOException("Must specify table name"); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java b/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java new file mode 100644 index 00000000000..155ce82ca76 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java @@ -0,0 +1,39 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Reducer; + +/** + * Write a table, sorting by the input key + * + * @param key class + * @param value class + */ +@Deprecated +@SuppressWarnings("unchecked") +public interface TableReduce +extends Reducer { + +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java b/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java new file mode 100644 index 00000000000..5956ee8e046 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java @@ -0,0 +1,113 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +/** + * A table split corresponds to a key range [low, high) + */ +@Deprecated +public class TableSplit implements InputSplit, Comparable { + private byte [] m_tableName; + private byte [] m_startRow; + private byte [] m_endRow; + private String m_regionLocation; + + /** default constructor */ + public TableSplit() { + this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, ""); + } + + /** + * Constructor + * @param tableName + * @param startRow + * @param endRow + * @param location + */ + public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow, + final String location) { + this.m_tableName = tableName; + this.m_startRow = startRow; + this.m_endRow = endRow; + this.m_regionLocation = location; + } + + /** @return table name */ + public byte [] getTableName() { + return this.m_tableName; + } + + /** @return starting row key */ + public byte [] getStartRow() { + return this.m_startRow; + } + + /** @return end row key */ + public byte [] getEndRow() { + return this.m_endRow; + } + + /** @return the region's hostname */ + public String getRegionLocation() { + return this.m_regionLocation; + } + + public String[] getLocations() { + return new String[] {this.m_regionLocation}; + } + + public long getLength() { + // Not clear how to obtain this... seems to be used only for sorting splits + return 0; + } + + public void readFields(DataInput in) throws IOException { + this.m_tableName = Bytes.readByteArray(in); + this.m_startRow = Bytes.readByteArray(in); + this.m_endRow = Bytes.readByteArray(in); + this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); + } + + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, this.m_tableName); + Bytes.writeByteArray(out, this.m_startRow); + Bytes.writeByteArray(out, this.m_endRow); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); + } + + @Override + public String toString() { + return m_regionLocation + ":" + + Bytes.toStringBinary(m_startRow) + "," + Bytes.toStringBinary(m_endRow); + } + + public int compareTo(TableSplit o) { + return Bytes.compareTo(getStartRow(), o.getStartRow()); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/mapred/package-info.java b/src/java/org/apache/hadoop/hbase/mapred/package-info.java new file mode 100644 index 00000000000..cd15b99842a --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/mapred/package-info.java @@ -0,0 +1,128 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** +Provides HBase MapReduce +Input/OutputFormats, a table indexing MapReduce job, and utility + +

    Table of Contents

    + + +

    HBase, MapReduce and the CLASSPATH

    + +

    MapReduce jobs deployed to a MapReduce cluster do not by default have access +to the HBase configuration under $HBASE_CONF_DIR nor to HBase classes. +You could add hbase-site.xml to $HADOOP_HOME/conf and add +hbase-X.X.X.jar to the $HADOOP_HOME/lib and copy these +changes across your cluster but the cleanest means of adding hbase configuration +and classes to the cluster CLASSPATH is by uncommenting +HADOOP_CLASSPATH in $HADOOP_HOME/conf/hadoop-env.sh +and adding the path to the hbase jar and $HBASE_CONF_DIR directory. +Then copy the amended configuration around the cluster. +You'll probably need to restart the MapReduce cluster if you want it to notice +the new configuration. +

    + +

    For example, here is how you would amend hadoop-env.sh adding the +built hbase jar, hbase conf, and the PerformanceEvaluation class from +the built hbase test jar to the hadoop CLASSPATH: + +

    # Extra Java CLASSPATH elements. Optional.
    +# export HADOOP_CLASSPATH=
    +export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf
    + +

    Expand $HBASE_HOME in the above appropriately to suit your +local environment.

    + +

    After copying the above change around your cluster, this is how you would run +the PerformanceEvaluation MR job to put up 4 clients (Presumes a ready mapreduce +cluster): + +

    $HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4
    + +The PerformanceEvaluation class wil be found on the CLASSPATH because you +added $HBASE_HOME/build/test to HADOOP_CLASSPATH +

    + +

    Another possibility, if for example you do not have access to hadoop-env.sh or +are unable to restart the hadoop cluster, is bundling the hbase jar into a mapreduce +job jar adding it and its dependencies under the job jar lib/ +directory and the hbase conf into a job jar conf/ directory. + + +

    HBase as MapReduce job data source and sink

    + +

    HBase can be used as a data source, {@link org.apache.hadoop.hbase.mapred.TableInputFormat TableInputFormat}, +and data sink, {@link org.apache.hadoop.hbase.mapred.TableOutputFormat TableOutputFormat}, for MapReduce jobs. +Writing MapReduce jobs that read or write HBase, you'll probably want to subclass +{@link org.apache.hadoop.hbase.mapred.TableMap TableMap} and/or +{@link org.apache.hadoop.hbase.mapred.TableReduce TableReduce}. See the do-nothing +pass-through classes {@link org.apache.hadoop.hbase.mapred.IdentityTableMap IdentityTableMap} and +{@link org.apache.hadoop.hbase.mapred.IdentityTableReduce IdentityTableReduce} for basic usage. For a more +involved example, see {@link org.apache.hadoop.hbase.mapred.BuildTableIndex BuildTableIndex} +or review the org.apache.hadoop.hbase.mapred.TestTableMapReduce unit test. +

    + +

    Running mapreduce jobs that have hbase as source or sink, you'll need to +specify source/sink table and column names in your configuration.

    + +

    Reading from hbase, the TableInputFormat asks hbase for the list of +regions and makes a map-per-region or mapred.map.tasks maps, +whichever is smaller (If your job only has two maps, up mapred.map.tasks +to a number > number of regions). Maps will run on the adjacent TaskTracker +if you are running a TaskTracer and RegionServer per node. +Writing, it may make sense to avoid the reduce step and write yourself back into +hbase from inside your map. You'd do this when your job does not need the sort +and collation that mapreduce does on the map emitted data; on insert, +hbase 'sorts' so there is no point double-sorting (and shuffling data around +your mapreduce cluster) unless you need to. If you do not need the reduce, +you might just have your map emit counts of records processed just so the +framework's report at the end of your job has meaning or set the number of +reduces to zero and use TableOutputFormat. See example code +below. If running the reduce step makes sense in your case, its usually better +to have lots of reducers so load is spread across the hbase cluster.

    + +

    There is also a new hbase partitioner that will run as many reducers as +currently existing regions. The +{@link org.apache.hadoop.hbase.mapred.HRegionPartitioner} is suitable +when your table is large and your upload is not such that it will greatly +alter the number of existing regions when done; other use the default +partitioner. +

    + +

    Example Code

    +

    Sample Row Counter

    +

    See {@link org.apache.hadoop.hbase.mapred.RowCounter}. You should be able to run +it by doing: % ./bin/hadoop jar hbase-X.X.X.jar. This will invoke +the hbase MapReduce Driver class. Select 'rowcounter' from the choice of jobs +offered. You may need to add the hbase conf directory to $HADOOP_HOME/conf/hadoop-env.sh#HADOOP_CLASSPATH +so the rowcounter gets pointed at the right hbase cluster (or, build a new jar +with an appropriate hbase-site.xml built into your job jar). +

    +

    PerformanceEvaluation

    +

    See org.apache.hadoop.hbase.PerformanceEvaluation from hbase src/test. It runs +a mapreduce job to run concurrent clients reading and writing hbase. +

    + +*/ +package org.apache.hadoop.hbase.mapred; \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java b/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java new file mode 100644 index 00000000000..5a5c3c67c39 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java @@ -0,0 +1,244 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +/** + * Test Map/Reduce job over HBase tables. The map/reduce process we're testing + * on our tables is simple - take every row in the table, reverse the value of + * a particular cell, and write it back to the table. + */ +public class TestTableMapReduce extends MultiRegionTable { + private static final Log LOG = + LogFactory.getLog(TestTableMapReduce.class.getName()); + + static final String MULTI_REGION_TABLE_NAME = "mrtest"; + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); + + private static final byte [][] columns = new byte [][] { + INPUT_FAMILY, + OUTPUT_FAMILY + }; + + /** constructor */ + public TestTableMapReduce() { + super(Bytes.toString(INPUT_FAMILY)); + desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); + desc.addFamily(new HColumnDescriptor(INPUT_FAMILY)); + desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY)); + } + + /** + * Pass the given key and processed record reduce + */ + public static class ProcessContentsMapper + extends MapReduceBase + implements TableMap { + /** + * Pass the key, and reversed value to reduce + * @param key + * @param value + * @param output + * @param reporter + * @throws IOException + */ + public void map(ImmutableBytesWritable key, Result value, + OutputCollector output, + Reporter reporter) + throws IOException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> + cf = value.getMap(); + if(!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + + // Get the original value and reverse it + + String originalValue = new String(value.getValue(INPUT_FAMILY, null), + HConstants.UTF8_ENCODING); + StringBuilder newValue = new StringBuilder(originalValue); + newValue.reverse(); + + // Now set the value to be collected + + Put outval = new Put(key.get()); + outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); + output.collect(key, outval); + } + } + + /** + * Test a map/reduce against a multi-region table + * @throws IOException + */ + public void testMultiRegionTable() throws IOException { + runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME)); + } + + private void runTestOnTable(HTable table) throws IOException { + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + + JobConf jobConf = null; + try { + LOG.info("Before map/reduce startup"); + jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf.setJobName("process column contents"); + jobConf.setNumReduceTasks(1); + TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()), + Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class, + ImmutableBytesWritable.class, Put.class, jobConf); + TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()), + IdentityTableReduce.class, jobConf); + + LOG.info("Started " + Bytes.toString(table.getTableName())); + JobClient.runJob(jobConf); + LOG.info("After map/reduce completion"); + + // verify map-reduce results + verify(Bytes.toString(table.getTableName())); + } finally { + mrCluster.shutdown(); + if (jobConf != null) { + FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); + } + } + } + + private void verify(String tableName) throws IOException { + HTable table = new HTable(conf, tableName); + boolean verified = false; + long pause = conf.getLong("hbase.client.pause", 5 * 1000); + int numRetries = conf.getInt("hbase.client.retries.number", 5); + for (int i = 0; i < numRetries; i++) { + try { + LOG.info("Verification attempt #" + i); + verifyAttempt(table); + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + LOG.debug("Verification attempt failed: " + e.getMessage()); + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + assertTrue(verified); + } + + /** + * Looks at every value of the mapreduce output and verifies that indeed + * the values have been reversed. + * @param table Table to scan. + * @throws IOException + * @throws NullPointerException if we failed to find a cell value + */ + private void verifyAttempt(final HTable table) throws IOException, NullPointerException { + Scan scan = new Scan(); + scan.addColumns(columns); + ResultScanner scanner = table.getScanner(scan); + try { + for (Result r : scanner) { + if (LOG.isDebugEnabled()) { + if (r.size() > 2 ) { + throw new IOException("Too many results, expected 2 got " + + r.size()); + } + } + byte[] firstValue = null; + byte[] secondValue = null; + int count = 0; + for(KeyValue kv : r.list()) { + if (count == 0) { + firstValue = kv.getValue(); + } + if (count == 1) { + secondValue = kv.getValue(); + } + count++; + if (count == 2) { + break; + } + } + + + String first = ""; + if (firstValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": first value is null"); + } + first = new String(firstValue, HConstants.UTF8_ENCODING); + + String second = ""; + if (secondValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": second value is null"); + } + byte[] secondReversed = new byte[secondValue.length]; + for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { + secondReversed[i] = secondValue[j]; + } + second = new String(secondReversed, HConstants.UTF8_ENCODING); + + if (first.compareTo(second) != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("second key is not the reverse of first. row=" + + r.getRow() + ", first value=" + first + ", second value=" + + second); + } + fail(); + } + } + } finally { + scanner.close(); + } + } +} \ No newline at end of file