From 293506c7cdef3e866c75a7b92558a423eb804d11 Mon Sep 17 00:00:00 2001 From: Dave Latham Date: Fri, 12 Jun 2015 16:00:00 -0700 Subject: [PATCH] HBASE-13639 SyncTable - rsync for HBase tables Signed-off-by: Andrew Purtell --- .../org/apache/hadoop/hbase/util/Bytes.java | 52 +- .../apache/hadoop/hbase/util/TestBytes.java | 48 ++ .../hadoop/hbase/mapreduce/HashTable.java | 747 +++++++++++++++++ .../hadoop/hbase/mapreduce/SyncTable.java | 773 ++++++++++++++++++ .../hadoop/hbase/mapreduce/TestHashTable.java | 192 +++++ .../hadoop/hbase/mapreduce/TestSyncTable.java | 334 ++++++++ 6 files changed, 2135 insertions(+), 11 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java index 9ac691237a7..5d452608ecb 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java @@ -648,7 +648,7 @@ public class Bytes implements Comparable { if (off + len > b.length) len = b.length - off; for (int i = off; i < off + len ; ++i) { int ch = b[i] & 0xFF; - if ( (ch >= '0' && ch <= '9') + if ((ch >= '0' && ch <= '9') || (ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) { @@ -2590,13 +2590,46 @@ public class Bytes implements Comparable { return result; } + private static final char[] HEX_CHARS = { + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' + }; + + /** + * Convert a byte range into a hex string + */ + public static String toHex(byte[] b, int offset, int length) { + checkArgument(length <= Integer.MAX_VALUE / 2); + int numChars = length * 2; + char[] ch = new char[numChars]; + for (int i = 0; i < numChars; i += 2) + { + byte d = b[offset + i/2]; + ch[i] = HEX_CHARS[(d >> 4) & 0x0F]; + ch[i+1] = HEX_CHARS[d & 0x0F]; + } + return new String(ch); + } + /** * Convert a byte array into a hex string - * @param b */ public static String toHex(byte[] b) { - checkArgument(b.length > 0, "length must be greater than 0"); - return String.format("%x", new BigInteger(1, b)); + return toHex(b, 0, b.length); + } + + private static int hexCharToNibble(char ch) { + if (ch <= '9' && ch >= '0') { + return ch - '0'; + } else if (ch >= 'a' && ch <= 'f') { + return ch - 'a' + 10; + } else if (ch >= 'A' && ch <= 'F') { + return ch - 'A' + 10; + } + throw new IllegalArgumentException("Invalid hex char: " + ch); + } + + private static byte hexCharsToByte(char c1, char c2) { + return (byte) ((hexCharToNibble(c1) << 4) | hexCharToNibble(c2)); } /** @@ -2605,14 +2638,11 @@ public class Bytes implements Comparable { * @param hex */ public static byte[] fromHex(String hex) { - checkArgument(hex.length() > 0, "length must be greater than 0"); checkArgument(hex.length() % 2 == 0, "length must be a multiple of 2"); - // Make sure letters are upper case - hex = hex.toUpperCase(); - byte[] b = new byte[hex.length() / 2]; - for (int i = 0; i < b.length; i++) { - b[i] = (byte)((toBinaryFromHex((byte)hex.charAt(2 * i)) << 4) + - toBinaryFromHex((byte)hex.charAt((2 * i + 1)))); + int len = hex.length(); + byte[] b = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + b[i / 2] = hexCharsToByte(hex.charAt(i),hex.charAt(i+1)); } return b; } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java index adb87fd9918..42afb28d943 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java @@ -24,7 +24,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Random; import junit.framework.TestCase; @@ -494,5 +496,51 @@ public class TestBytes extends TestCase { Assert.assertEquals(i, b[i]); } } + + public void testToFromHex() { + List testStrings = new ArrayList(); + testStrings.addAll(Arrays.asList(new String[] { + "", + "00", + "A0", + "ff", + "FFffFFFFFFFFFF", + "12", + "0123456789abcdef", + "283462839463924623984692834692346ABCDFEDDCA0", + })); + for (String testString : testStrings) + { + byte[] byteData = Bytes.fromHex(testString); + Assert.assertEquals(testString.length() / 2, byteData.length); + String result = Bytes.toHex(byteData); + Assert.assertTrue(testString.equalsIgnoreCase(result)); + } + + List testByteData = new ArrayList(); + testByteData.addAll(Arrays.asList(new byte[][] { + new byte[0], + new byte[1], + new byte[10], + new byte[] {1, 2, 3, 4, 5}, + new byte[] {(byte) 0xFF}, + })); + Random r = new Random(); + for (int i = 0; i < 20; i++) + { + + byte[] bytes = new byte[r.nextInt(100)]; + r.nextBytes(bytes); + testByteData.add(bytes); + } + + for (byte[] testData : testByteData) + { + String hexString = Bytes.toHex(testData); + Assert.assertEquals(testData.length * 2, hexString.length()); + byte[] result = Bytes.fromHex(hexString); + Assert.assertArrayEquals(testData, result); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java new file mode 100644 index 00000000000..20ae4a628e5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java @@ -0,0 +1,747 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.collect.Ordering; + +public class HashTable extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(HashTable.class); + + private static final int DEFAULT_BATCH_SIZE = 8000; + + private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size"; + final static String PARTITIONS_FILE_NAME = "partitions"; + final static String MANIFEST_FILE_NAME = "manifest"; + final static String HASH_DATA_DIR = "hashes"; + final static String OUTPUT_DATA_FILE_PREFIX = "part-r-"; + private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp"; + + TableHash tableHash = new TableHash(); + Path destPath; + + public HashTable(Configuration conf) { + super(conf); + } + + public static class TableHash { + + Path hashDir; + + String tableName; + String families = null; + long batchSize = DEFAULT_BATCH_SIZE; + int numHashFiles = 0; + byte[] startRow = HConstants.EMPTY_START_ROW; + byte[] stopRow = HConstants.EMPTY_END_ROW; + int scanBatch = 0; + int versions = -1; + long startTime = 0; + long endTime = 0; + + List partitions; + + public static TableHash read(Configuration conf, Path hashDir) throws IOException { + TableHash tableHash = new TableHash(); + FileSystem fs = hashDir.getFileSystem(conf); + tableHash.hashDir = hashDir; + tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME)); + tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME)); + return tableHash; + } + + void writePropertiesFile(FileSystem fs, Path path) throws IOException { + Properties p = new Properties(); + p.setProperty("table", tableName); + if (families != null) { + p.setProperty("columnFamilies", families); + } + p.setProperty("targetBatchSize", Long.toString(batchSize)); + p.setProperty("numHashFiles", Integer.toString(numHashFiles)); + if (!isTableStartRow(startRow)) { + p.setProperty("startRowHex", Bytes.toHex(startRow)); + } + if (!isTableEndRow(stopRow)) { + p.setProperty("stopRowHex", Bytes.toHex(stopRow)); + } + if (scanBatch > 0) { + p.setProperty("scanBatch", Integer.toString(scanBatch)); + } + if (versions >= 0) { + p.setProperty("versions", Integer.toString(versions)); + } + if (startTime != 0) { + p.setProperty("startTimestamp", Long.toString(startTime)); + } + if (endTime != 0) { + p.setProperty("endTimestamp", Long.toString(endTime)); + } + + FSDataOutputStream out = fs.create(path); + p.store(new OutputStreamWriter(out, Charsets.UTF_8), null); + out.close(); + } + + void readPropertiesFile(FileSystem fs, Path path) throws IOException { + FSDataInputStream in = fs.open(path); + Properties p = new Properties(); + p.load(new InputStreamReader(in, Charsets.UTF_8)); + in.close(); + + tableName = p.getProperty("table"); + families = p.getProperty("columnFamilies"); + batchSize = Long.parseLong(p.getProperty("targetBatchSize")); + numHashFiles = Integer.parseInt(p.getProperty("numHashFiles")); + + String startRowHex = p.getProperty("startRowHex"); + if (startRowHex != null) { + startRow = Bytes.fromHex(startRowHex); + } + String stopRowHex = p.getProperty("stopRowHex"); + if (stopRowHex != null) { + stopRow = Bytes.fromHex(stopRowHex); + } + + String scanBatchString = p.getProperty("scanBatch"); + if (scanBatchString != null) { + scanBatch = Integer.parseInt(scanBatchString); + } + + String versionString = p.getProperty("versions"); + if (versionString != null) { + versions = Integer.parseInt(versionString); + } + + String startTimeString = p.getProperty("startTimestamp"); + if (startTimeString != null) { + startTime = Long.parseLong(startTimeString); + } + + String endTimeString = p.getProperty("endTimestamp"); + if (endTimeString != null) { + endTime = Long.parseLong(endTimeString); + } + } + + Scan initScan() throws IOException { + Scan scan = new Scan(); + scan.setCacheBlocks(false); + if (startTime != 0 || endTime != 0) { + scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); + } + if (scanBatch > 0) { + scan.setBatch(scanBatch); + } + if (versions >= 0) { + scan.setMaxVersions(versions); + } + if (!isTableStartRow(startRow)) { + scan.setStartRow(startRow); + } + if (!isTableEndRow(stopRow)) { + scan.setStopRow(stopRow); + } + if(families != null) { + for(String fam : families.split(",")) { + scan.addFamily(Bytes.toBytes(fam)); + } + } + return scan; + } + + /** + * Choose partitions between row ranges to hash to a single output file + * Selects region boundaries that fall within the scan range, and groups them + * into the desired number of partitions. + */ + void selectPartitions(Pair regionStartEndKeys) { + List startKeys = new ArrayList(); + for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) { + byte[] regionStartKey = regionStartEndKeys.getFirst()[i]; + byte[] regionEndKey = regionStartEndKeys.getSecond()[i]; + + // if scan begins after this region, or starts before this region, then drop this region + // in other words: + // IF (scan begins before the end of this region + // AND scan ends before the start of this region) + // THEN include this region + if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey) + || Bytes.compareTo(startRow, regionEndKey) < 0) + && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey) + || Bytes.compareTo(stopRow, regionStartKey) > 0)) { + startKeys.add(regionStartKey); + } + } + + int numRegions = startKeys.size(); + if (numHashFiles == 0) { + numHashFiles = numRegions / 100; + } + if (numHashFiles == 0) { + numHashFiles = 1; + } + if (numHashFiles > numRegions) { + // can't partition within regions + numHashFiles = numRegions; + } + + // choose a subset of start keys to group regions into ranges + partitions = new ArrayList(numHashFiles - 1); + // skip the first start key as it is not a partition between ranges. + for (long i = 1; i < numHashFiles; i++) { + int splitIndex = (int) (numRegions * i / numHashFiles); + partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex))); + } + } + + void writePartitionFile(Configuration conf, Path path) throws IOException { + FileSystem fs = path.getFileSystem(conf); + @SuppressWarnings("deprecation") + SequenceFile.Writer writer = SequenceFile.createWriter( + fs, conf, path, ImmutableBytesWritable.class, NullWritable.class); + + for (int i = 0; i < partitions.size(); i++) { + writer.append(partitions.get(i), NullWritable.get()); + } + writer.close(); + } + + private void readPartitionFile(FileSystem fs, Configuration conf, Path path) + throws IOException { + @SuppressWarnings("deprecation") + SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); + ImmutableBytesWritable key = new ImmutableBytesWritable(); + partitions = new ArrayList(); + while (reader.next(key)) { + partitions.add(new ImmutableBytesWritable(key.copyBytes())); + } + reader.close(); + + if (!Ordering.natural().isOrdered(partitions)) { + throw new IOException("Partitions are not ordered!"); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("tableName=").append(tableName); + if (families != null) { + sb.append(", families=").append(families); + } + sb.append(", batchSize=").append(batchSize); + sb.append(", numHashFiles=").append(numHashFiles); + if (!isTableStartRow(startRow)) { + sb.append(", startRowHex=").append(Bytes.toHex(startRow)); + } + if (!isTableEndRow(stopRow)) { + sb.append(", stopRowHex=").append(Bytes.toHex(stopRow)); + } + if (scanBatch >= 0) { + sb.append(", scanBatch=").append(scanBatch); + } + if (versions >= 0) { + sb.append(", versions=").append(versions); + } + if (startTime != 0) { + sb.append("startTime=").append(startTime); + } + if (endTime != 0) { + sb.append("endTime=").append(endTime); + } + return sb.toString(); + } + + static String getDataFileName(int hashFileIndex) { + return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex); + } + + /** + * Open a TableHash.Reader starting at the first hash at or after the given key. + * @throws IOException + */ + public Reader newReader(Configuration conf, ImmutableBytesWritable startKey) + throws IOException { + return new Reader(conf, startKey); + } + + public class Reader implements java.io.Closeable { + private final Configuration conf; + + private int hashFileIndex; + private MapFile.Reader mapFileReader; + + private boolean cachedNext; + private ImmutableBytesWritable key; + private ImmutableBytesWritable hash; + + Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException { + this.conf = conf; + int partitionIndex = Collections.binarySearch(partitions, startKey); + if (partitionIndex >= 0) { + // if the key is equal to a partition, then go the file after that partition + hashFileIndex = partitionIndex+1; + } else { + // if the key is between partitions, then go to the file between those partitions + hashFileIndex = -1-partitionIndex; + } + openHashFile(); + + // MapFile's don't make it easy to seek() so that the subsequent next() returns + // the desired key/value pair. So we cache it for the first call of next(). + hash = new ImmutableBytesWritable(); + key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash); + if (key == null) { + cachedNext = false; + hash = null; + } else { + cachedNext = true; + } + } + + /** + * Read the next key/hash pair. + * Returns true if such a pair exists and false when at the end of the data. + */ + public boolean next() throws IOException { + if (cachedNext) { + cachedNext = false; + return true; + } + key = new ImmutableBytesWritable(); + hash = new ImmutableBytesWritable(); + while (true) { + boolean hasNext = mapFileReader.next(key, hash); + if (hasNext) { + return true; + } + hashFileIndex++; + if (hashFileIndex < TableHash.this.numHashFiles) { + mapFileReader.close(); + openHashFile(); + } else { + key = null; + hash = null; + return false; + } + } + } + + /** + * Get the current key + * @return the current key or null if there is no current key + */ + public ImmutableBytesWritable getCurrentKey() { + return key; + } + + /** + * Get the current hash + * @return the current hash or null if there is no current hash + */ + public ImmutableBytesWritable getCurrentHash() { + return hash; + } + + private void openHashFile() throws IOException { + if (mapFileReader != null) { + mapFileReader.close(); + } + Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR); + Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex)); + mapFileReader = new MapFile.Reader(dataFile, conf); + } + + @Override + public void close() throws IOException { + mapFileReader.close(); + } + } + } + + static boolean isTableStartRow(byte[] row) { + return Bytes.equals(HConstants.EMPTY_START_ROW, row); + } + + static boolean isTableEndRow(byte[] row) { + return Bytes.equals(HConstants.EMPTY_END_ROW, row); + } + + public Job createSubmittableJob(String[] args) throws IOException { + Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME); + generatePartitions(partitionsPath); + + Job job = Job.getInstance(getConf(), + getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName)); + Configuration jobConf = job.getConfiguration(); + jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize); + job.setJarByClass(HashTable.class); + + TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(), + HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + + // use a TotalOrderPartitioner and reducers to group region output into hash files + job.setPartitionerClass(TotalOrderPartitioner.class); + TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath); + job.setReducerClass(Reducer.class); // identity reducer + job.setNumReduceTasks(tableHash.numHashFiles); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(ImmutableBytesWritable.class); + job.setOutputFormatClass(MapFileOutputFormat.class); + FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR)); + + return job; + } + + private void generatePartitions(Path partitionsPath) throws IOException { + Connection connection = ConnectionFactory.createConnection(getConf()); + Pair regionKeys + = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys(); + connection.close(); + + tableHash.selectPartitions(regionKeys); + LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath); + + tableHash.writePartitionFile(getConf(), partitionsPath); + } + + static class ResultHasher { + private MessageDigest digest; + + private boolean batchStarted = false; + private ImmutableBytesWritable batchStartKey; + private ImmutableBytesWritable batchHash; + private long batchSize = 0; + + + public ResultHasher() { + try { + digest = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + Throwables.propagate(e); + } + } + + public void startBatch(ImmutableBytesWritable row) { + if (batchStarted) { + throw new RuntimeException("Cannot start new batch without finishing existing one."); + } + batchStarted = true; + batchSize = 0; + batchStartKey = row; + batchHash = null; + } + + public void hashResult(Result result) { + if (!batchStarted) { + throw new RuntimeException("Cannot add to batch that has not been started."); + } + for (Cell cell : result.rawCells()) { + int rowLength = cell.getRowLength(); + int familyLength = cell.getFamilyLength(); + int qualifierLength = cell.getQualifierLength(); + int valueLength = cell.getValueLength(); + digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength); + digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength); + digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength); + long ts = cell.getTimestamp(); + for (int i = 8; i > 0; i--) { + digest.update((byte) ts); + ts >>>= 8; + } + digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength); + + batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength; + } + } + + public void finishBatch() { + if (!batchStarted) { + throw new RuntimeException("Cannot finish batch that has not started."); + } + batchStarted = false; + batchHash = new ImmutableBytesWritable(digest.digest()); + } + + public boolean isBatchStarted() { + return batchStarted; + } + + public ImmutableBytesWritable getBatchStartKey() { + return batchStartKey; + } + + public ImmutableBytesWritable getBatchHash() { + return batchHash; + } + + public long getBatchSize() { + return batchSize; + } + } + + public static class HashMapper + extends TableMapper { + + private ResultHasher hasher; + private long targetBatchSize; + + private ImmutableBytesWritable currentRow; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + targetBatchSize = context.getConfiguration() + .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); + hasher = new ResultHasher(); + + TableSplit split = (TableSplit) context.getInputSplit(); + hasher.startBatch(new ImmutableBytesWritable(split.getStartRow())); + } + + @Override + protected void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + + if (currentRow == null || !currentRow.equals(key)) { + currentRow = new ImmutableBytesWritable(key); // not immutable + + if (hasher.getBatchSize() >= targetBatchSize) { + hasher.finishBatch(); + context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); + hasher.startBatch(currentRow); + } + } + + hasher.hashResult(value); + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + hasher.finishBatch(); + context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); + } + } + + private void writeTempManifestFile() throws IOException { + Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); + FileSystem fs = tempManifestPath.getFileSystem(getConf()); + tableHash.writePropertiesFile(fs, tempManifestPath); + } + + private void completeManifest() throws IOException { + Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); + Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME); + FileSystem fs = tempManifestPath.getFileSystem(getConf()); + fs.rename(tempManifestPath, manifestPath); + } + + private static final int NUM_ARGS = 2; + private static void printUsage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + System.err.println(); + } + System.err.println("Usage: HashTable [options] "); + System.err.println(); + System.err.println("Options:"); + System.err.println(" batchsize the target amount of bytes to hash in each batch"); + System.err.println(" rows are added to the batch until this size is reached"); + System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)"); + System.err.println(" numhashfiles the number of hash files to create"); + System.err.println(" if set to fewer than number of regions then"); + System.err.println(" the job will create this number of reducers"); + System.err.println(" (defaults to 1/100 of regions -- at least 1)"); + System.err.println(" startrow the start row"); + System.err.println(" stoprow the stop row"); + System.err.println(" starttime beginning of the time range (unixtime in millis)"); + System.err.println(" without endtime means from starttime to forever"); + System.err.println(" endtime end of the time range. Ignored if no starttime specified."); + System.err.println(" scanbatch scanner batch size to support intra row scans"); + System.err.println(" versions number of cell versions to include"); + System.err.println(" families comma-separated list of families to include"); + System.err.println(); + System.err.println("Args:"); + System.err.println(" tablename Name of the table to hash"); + System.err.println(" outputpath Filesystem path to put the output data"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:"); + System.err.println(" $ bin/hbase " + + "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50" + + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3" + + " TestTable /hashes/testTable"); + } + + private boolean doCommandLine(final String[] args) { + if (args.length < NUM_ARGS) { + printUsage(null); + return false; + } + try { + + tableHash.tableName = args[args.length-2]; + destPath = new Path(args[args.length-1]); + + for (int i = 0; i < args.length - NUM_ARGS; i++) { + String cmd = args[i]; + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(null); + return false; + } + + final String batchSizeArgKey = "--batchsize="; + if (cmd.startsWith(batchSizeArgKey)) { + tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length())); + continue; + } + + final String numHashFilesArgKey = "--numhashfiles="; + if (cmd.startsWith(numHashFilesArgKey)) { + tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length())); + continue; + } + + final String startRowArgKey = "--startrow="; + if (cmd.startsWith(startRowArgKey)) { + tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length())); + continue; + } + + final String stopRowArgKey = "--stoprow="; + if (cmd.startsWith(stopRowArgKey)) { + tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length())); + continue; + } + + final String startTimeArgKey = "--starttime="; + if (cmd.startsWith(startTimeArgKey)) { + tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); + continue; + } + + final String endTimeArgKey = "--endtime="; + if (cmd.startsWith(endTimeArgKey)) { + tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); + continue; + } + + final String scanBatchArgKey = "--scanbatch="; + if (cmd.startsWith(scanBatchArgKey)) { + tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length())); + continue; + } + + final String versionsArgKey = "--versions="; + if (cmd.startsWith(versionsArgKey)) { + tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); + continue; + } + + final String familiesArgKey = "--families="; + if (cmd.startsWith(familiesArgKey)) { + tableHash.families = cmd.substring(familiesArgKey.length()); + continue; + } + + printUsage("Invalid argument '" + cmd + "'"); + return false; + } + if ((tableHash.startTime != 0 || tableHash.endTime != 0) + && (tableHash.startTime >= tableHash.endTime)) { + printUsage("Invalid time range filter: starttime=" + + tableHash.startTime + " >= endtime=" + tableHash.endTime); + return false; + } + + } catch (Exception e) { + e.printStackTrace(); + printUsage("Can't start because " + e.getMessage()); + return false; + } + return true; + } + + /** + * Main entry point. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + if (!doCommandLine(otherArgs)) { + return 1; + } + + Job job = createSubmittableJob(otherArgs); + writeTempManifestFile(); + if (!job.waitForCompletion(true)) { + LOG.info("Map-reduce job failed!"); + return 1; + } + completeManifest(); + return 0; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java new file mode 100644 index 00000000000..3495ca97006 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -0,0 +1,773 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; + +public class SyncTable extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(SyncTable.class); + + static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir"; + static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name"; + static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name"; + static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; + static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; + static final String DRY_RUN_CONF_KEY="sync.table.dry.run"; + + Path sourceHashDir; + String sourceTableName; + String targetTableName; + + String sourceZkCluster; + String targetZkCluster; + boolean dryRun; + + Counters counters; + + public SyncTable(Configuration conf) { + super(conf); + } + + public Job createSubmittableJob(String[] args) throws IOException { + FileSystem fs = sourceHashDir.getFileSystem(getConf()); + if (!fs.exists(sourceHashDir)) { + throw new IOException("Source hash dir not found: " + sourceHashDir); + } + + HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir); + LOG.info("Read source hash manifest: " + tableHash); + LOG.info("Read " + tableHash.partitions.size() + " partition keys"); + if (!tableHash.tableName.equals(sourceTableName)) { + LOG.warn("Table name mismatch - manifest indicates hash was taken from: " + + tableHash.tableName + " but job is reading from: " + sourceTableName); + } + if (tableHash.numHashFiles != tableHash.partitions.size() + 1) { + throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" + + " should be 1 more than the number of partition keys. However, the manifest file " + + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys" + + " found in the partitions file is " + tableHash.partitions.size()); + } + + Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR); + int dataSubdirCount = 0; + for (FileStatus file : fs.listStatus(dataDir)) { + if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) { + dataSubdirCount++; + } + } + + if (dataSubdirCount != tableHash.numHashFiles) { + throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" + + " should be 1 more than the number of partition keys. However, the number of data dirs" + + " found is " + dataSubdirCount + " but the number of partition keys" + + " found in the partitions file is " + tableHash.partitions.size()); + } + + Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name", + "syncTable_" + sourceTableName + "-" + targetTableName)); + Configuration jobConf = job.getConfiguration(); + job.setJarByClass(HashTable.class); + jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString()); + jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName); + jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName); + if (sourceZkCluster != null) { + jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster); + } + if (targetZkCluster != null) { + jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); + } + jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); + + TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), + SyncMapper.class, null, null, job); + + job.setNumReduceTasks(0); + + if (dryRun) { + job.setOutputFormatClass(NullOutputFormat.class); + } else { + // No reducers. Just write straight to table. Call initTableReducerJob + // because it sets up the TableOutputFormat. + TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, + targetZkCluster, null, null); + + // would be nice to add an option for bulk load instead + } + + return job; + } + + public static class SyncMapper extends TableMapper { + Path sourceHashDir; + + Connection sourceConnection; + Connection targetConnection; + Table sourceTable; + Table targetTable; + boolean dryRun; + + HashTable.TableHash sourceTableHash; + HashTable.TableHash.Reader sourceHashReader; + ImmutableBytesWritable currentSourceHash; + ImmutableBytesWritable nextSourceKey; + HashTable.ResultHasher targetHasher; + + Throwable mapperException; + + public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS, + SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES, + MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED}; + + @Override + protected void setup(Context context) throws IOException { + + Configuration conf = context.getConfiguration(); + sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); + sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY); + targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY); + sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); + targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); + dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false); + + sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); + LOG.info("Read source hash manifest: " + sourceTableHash); + LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys"); + + TableSplit split = (TableSplit) context.getInputSplit(); + ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow()); + + sourceHashReader = sourceTableHash.newReader(conf, splitStartKey); + findNextKeyHashPair(); + + // create a hasher, but don't start it right away + // instead, find the first hash batch at or after the start row + // and skip any rows that come before. they will be caught by the previous task + targetHasher = new HashTable.ResultHasher(); + } + + private static Connection openConnection(Configuration conf, String zkClusterConfKey) + throws IOException { + Configuration clusterConf = new Configuration(conf); + String zkCluster = conf.get(zkClusterConfKey); + if (zkCluster != null) { + ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster); + } + return ConnectionFactory.createConnection(clusterConf); + } + + private static Table openTable(Connection connection, Configuration conf, + String tableNameConfKey) throws IOException { + return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey))); + } + + /** + * Attempt to read the next source key/hash pair. + * If there are no more, set nextSourceKey to null + */ + private void findNextKeyHashPair() throws IOException { + boolean hasNext = sourceHashReader.next(); + if (hasNext) { + nextSourceKey = sourceHashReader.getCurrentKey(); + } else { + // no more keys - last hash goes to the end + nextSourceKey = null; + } + } + + @Override + protected void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + try { + // first, finish any hash batches that end before the scanned row + while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) { + moveToNextBatch(context); + } + + // next, add the scanned row (as long as we've reached the first batch) + if (targetHasher.isBatchStarted()) { + targetHasher.hashResult(value); + } + } catch (Throwable t) { + mapperException = t; + Throwables.propagateIfInstanceOf(t, IOException.class); + Throwables.propagateIfInstanceOf(t, InterruptedException.class); + Throwables.propagate(t); + } + } + + /** + * If there is an open hash batch, complete it and sync if there are diffs. + * Start a new batch, and seek to read the + */ + private void moveToNextBatch(Context context) throws IOException, InterruptedException { + if (targetHasher.isBatchStarted()) { + finishBatchAndCompareHashes(context); + } + targetHasher.startBatch(nextSourceKey); + currentSourceHash = sourceHashReader.getCurrentHash(); + + findNextKeyHashPair(); + } + + /** + * Finish the currently open hash batch. + * Compare the target hash to the given source hash. + * If they do not match, then sync the covered key range. + */ + private void finishBatchAndCompareHashes(Context context) + throws IOException, InterruptedException { + targetHasher.finishBatch(); + context.getCounter(Counter.BATCHES).increment(1); + if (targetHasher.getBatchSize() == 0) { + context.getCounter(Counter.EMPTY_BATCHES).increment(1); + } + ImmutableBytesWritable targetHash = targetHasher.getBatchHash(); + if (targetHash.equals(currentSourceHash)) { + context.getCounter(Counter.HASHES_MATCHED).increment(1); + } else { + context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1); + + ImmutableBytesWritable stopRow = nextSourceKey == null + ? new ImmutableBytesWritable(sourceTableHash.stopRow) + : nextSourceKey; + + if (LOG.isDebugEnabled()) { + LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey()) + + " to " + toHex(stopRow) + + " sourceHash: " + toHex(currentSourceHash) + + " targetHash: " + toHex(targetHash)); + } + + syncRange(context, targetHasher.getBatchStartKey(), stopRow); + } + } + private static String toHex(ImmutableBytesWritable bytes) { + return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength()); + } + + private static final CellScanner EMPTY_CELL_SCANNER + = new CellScanner(Iterators.emptyIterator()); + + /** + * Rescan the given range directly from the source and target tables. + * Count and log differences, and if this is not a dry run, output Puts and Deletes + * to make the target table match the source table for this range + */ + private void syncRange(Context context, ImmutableBytesWritable startRow, + ImmutableBytesWritable stopRow) throws IOException, InterruptedException { + + Scan scan = sourceTableHash.initScan(); + scan.setStartRow(startRow.copyBytes()); + scan.setStopRow(stopRow.copyBytes()); + + ResultScanner sourceScanner = sourceTable.getScanner(scan); + CellScanner sourceCells = new CellScanner(sourceScanner.iterator()); + + ResultScanner targetScanner = targetTable.getScanner(scan); + CellScanner targetCells = new CellScanner(targetScanner.iterator()); + + boolean rangeMatched = true; + byte[] nextSourceRow = sourceCells.nextRow(); + byte[] nextTargetRow = targetCells.nextRow(); + while(nextSourceRow != null || nextTargetRow != null) { + boolean rowMatched; + int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow); + if (rowComparison < 0) { + if (LOG.isInfoEnabled()) { + LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow)); + } + context.getCounter(Counter.TARGETMISSINGROWS).increment(1); + + rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER); + nextSourceRow = sourceCells.nextRow(); // advance only source to next row + } else if (rowComparison > 0) { + if (LOG.isInfoEnabled()) { + LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow)); + } + context.getCounter(Counter.SOURCEMISSINGROWS).increment(1); + + rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells); + nextTargetRow = targetCells.nextRow(); // advance only target to next row + } else { + // current row is the same on both sides, compare cell by cell + rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells); + nextSourceRow = sourceCells.nextRow(); + nextTargetRow = targetCells.nextRow(); + } + + if (!rowMatched) { + rangeMatched = false; + } + } + + sourceScanner.close(); + targetScanner.close(); + + context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED) + .increment(1); + } + + private static class CellScanner { + private final Iterator results; + + private byte[] currentRow; + private Result currentRowResult; + private int nextCellInRow; + + private Result nextRowResult; + + public CellScanner(Iterator results) { + this.results = results; + } + + /** + * Advance to the next row and return its row key. + * Returns null iff there are no more rows. + */ + public byte[] nextRow() { + if (nextRowResult == null) { + // no cached row - check scanner for more + while (results.hasNext()) { + nextRowResult = results.next(); + Cell nextCell = nextRowResult.rawCells()[0]; + if (currentRow == null + || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(), + nextCell.getRowOffset(), nextCell.getRowLength())) { + // found next row + break; + } else { + // found another result from current row, keep scanning + nextRowResult = null; + } + } + + if (nextRowResult == null) { + // end of data, no more rows + currentRowResult = null; + currentRow = null; + return null; + } + } + + // advance to cached result for next row + currentRowResult = nextRowResult; + nextCellInRow = 0; + currentRow = currentRowResult.getRow(); + nextRowResult = null; + return currentRow; + } + + /** + * Returns the next Cell in the current row or null iff none remain. + */ + public Cell nextCellInRow() { + if (currentRowResult == null) { + // nothing left in current row + return null; + } + + Cell nextCell = currentRowResult.rawCells()[nextCellInRow]; + nextCellInRow++; + if (nextCellInRow == currentRowResult.size()) { + if (results.hasNext()) { + Result result = results.next(); + Cell cell = result.rawCells()[0]; + if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(), + cell.getRowOffset(), cell.getRowLength())) { + // result is part of current row + currentRowResult = result; + nextCellInRow = 0; + } else { + // result is part of next row, cache it + nextRowResult = result; + // current row is complete + currentRowResult = null; + } + } else { + // end of data + currentRowResult = null; + } + } + return nextCell; + } + } + + /** + * Compare the cells for the given row from the source and target tables. + * Count and log any differences. + * If not a dry run, output a Put and/or Delete needed to sync the target table + * to match the source table. + */ + private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells, + CellScanner targetCells) throws IOException, InterruptedException { + Put put = null; + Delete delete = null; + long matchingCells = 0; + boolean matchingRow = true; + Cell sourceCell = sourceCells.nextCellInRow(); + Cell targetCell = targetCells.nextCellInRow(); + while (sourceCell != null || targetCell != null) { + + int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell); + if (cellKeyComparison < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Target missing cell: " + sourceCell); + } + context.getCounter(Counter.TARGETMISSINGCELLS).increment(1); + matchingRow = false; + + if (!dryRun) { + if (put == null) { + put = new Put(rowKey); + } + put.add(sourceCell); + } + + sourceCell = sourceCells.nextCellInRow(); + } else if (cellKeyComparison > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Source missing cell: " + targetCell); + } + context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); + matchingRow = false; + + if (!dryRun) { + if (delete == null) { + delete = new Delete(rowKey); + } + // add a tombstone to exactly match the target cell that is missing on the source + delete.addColumn(CellUtil.cloneFamily(targetCell), + CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp()); + } + + targetCell = targetCells.nextCellInRow(); + } else { + // the cell keys are equal, now check values + if (CellUtil.matchingValue(sourceCell, targetCell)) { + matchingCells++; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Different values: "); + LOG.debug(" source cell: " + sourceCell + + " value: " + Bytes.toHex(sourceCell.getValueArray(), + sourceCell.getValueOffset(), sourceCell.getValueLength())); + LOG.debug(" target cell: " + targetCell + + " value: " + Bytes.toHex(targetCell.getValueArray(), + targetCell.getValueOffset(), targetCell.getValueLength())); + } + context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1); + matchingRow = false; + + if (!dryRun) { + // overwrite target cell + if (put == null) { + put = new Put(rowKey); + } + put.add(sourceCell); + } + } + sourceCell = sourceCells.nextCellInRow(); + targetCell = targetCells.nextCellInRow(); + } + + if (!dryRun && sourceTableHash.scanBatch > 0) { + if (put != null && put.size() >= sourceTableHash.scanBatch) { + context.write(new ImmutableBytesWritable(rowKey), put); + put = null; + } + if (delete != null && delete.size() >= sourceTableHash.scanBatch) { + context.write(new ImmutableBytesWritable(rowKey), delete); + delete = null; + } + } + } + + if (!dryRun) { + if (put != null) { + context.write(new ImmutableBytesWritable(rowKey), put); + } + if (delete != null) { + context.write(new ImmutableBytesWritable(rowKey), delete); + } + } + + if (matchingCells > 0) { + context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells); + } + if (matchingRow) { + context.getCounter(Counter.MATCHINGROWS).increment(1); + return true; + } else { + context.getCounter(Counter.ROWSWITHDIFFS).increment(1); + return false; + } + } + + private static final CellComparator cellComparator = new CellComparator(); + /** + * Compare row keys of the given Result objects. + * Nulls are after non-nulls + */ + private static int compareRowKeys(byte[] r1, byte[] r2) { + if (r1 == null) { + return 1; // source missing row + } else if (r2 == null) { + return -1; // target missing row + } else { + return cellComparator.compareRows(r1, 0, r1.length, r2, 0, r2.length); + } + } + + /** + * Compare families, qualifiers, and timestamps of the given Cells. + * They are assumed to be of the same row. + * Nulls are after non-nulls. + */ + private static int compareCellKeysWithinRow(Cell c1, Cell c2) { + if (c1 == null) { + return 1; // source missing cell + } + if (c2 == null) { + return -1; // target missing cell + } + + int result = CellComparator.compareFamilies(c1, c2); + if (result != 0) { + return result; + } + + result = CellComparator.compareQualifiers(c1, c2); + if (result != 0) { + return result; + } + + // note timestamp comparison is inverted - more recent cells first + return CellComparator.compareTimestamps(c1, c2); + } + + @Override + protected void cleanup(Context context) + throws IOException, InterruptedException { + if (mapperException == null) { + try { + finishRemainingHashRanges(context); + } catch (Throwable t) { + mapperException = t; + } + } + + try { + sourceTable.close(); + targetTable.close(); + sourceConnection.close(); + targetConnection.close(); + } catch (Throwable t) { + if (mapperException == null) { + mapperException = t; + } else { + LOG.error("Suppressing exception from closing tables", t); + } + } + + // propagate first exception + if (mapperException != null) { + Throwables.propagateIfInstanceOf(mapperException, IOException.class); + Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class); + Throwables.propagate(mapperException); + } + } + + private void finishRemainingHashRanges(Context context) throws IOException, + InterruptedException { + TableSplit split = (TableSplit) context.getInputSplit(); + byte[] splitEndRow = split.getEndRow(); + boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow); + + // if there are more hash batches that begin before the end of this split move to them + while (nextSourceKey != null + && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) { + moveToNextBatch(context); + } + + if (targetHasher.isBatchStarted()) { + // need to complete the final open hash batch + + if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0) + || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) { + // the open hash range continues past the end of this region + // add a scan to complete the current hash range + Scan scan = sourceTableHash.initScan(); + scan.setStartRow(splitEndRow); + if (nextSourceKey == null) { + scan.setStopRow(sourceTableHash.stopRow); + } else { + scan.setStopRow(nextSourceKey.copyBytes()); + } + + ResultScanner targetScanner = targetTable.getScanner(scan); + for (Result row : targetScanner) { + targetHasher.hashResult(row); + } + } // else current batch ends exactly at split end row + + finishBatchAndCompareHashes(context); + } + } + } + + private static final int NUM_ARGS = 3; + private static void printUsage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + System.err.println(); + } + System.err.println("Usage: SyncTable [options] "); + System.err.println(); + System.err.println("Options:"); + + System.err.println(" sourcezkcluster ZK cluster key of the source table"); + System.err.println(" (defaults to cluster in classpath's config)"); + System.err.println(" targetzkcluster ZK cluster key of the target table"); + System.err.println(" (defaults to cluster in classpath's config)"); + System.err.println(" dryrun if true, output counters but no writes"); + System.err.println(" (defaults to false)"); + System.err.println(); + System.err.println("Args:"); + System.err.println(" sourcehashdir path to HashTable output dir for source table"); + System.err.println(" if not specified, then all data will be scanned"); + System.err.println(" sourcetable Name of the source table to sync from"); + System.err.println(" targettable Name of the target table to sync to"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" For a dry run SyncTable of tableA from a remote source cluster"); + System.err.println(" to a local target cluster:"); + System.err.println(" $ bin/hbase " + + "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true" + + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase" + + " hdfs://nn:9000/hashes/tableA tableA tableA"); + } + + private boolean doCommandLine(final String[] args) { + if (args.length < NUM_ARGS) { + printUsage(null); + return false; + } + try { + sourceHashDir = new Path(args[args.length - 3]); + sourceTableName = args[args.length - 2]; + targetTableName = args[args.length - 1]; + + for (int i = 0; i < args.length - NUM_ARGS; i++) { + String cmd = args[i]; + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(null); + return false; + } + + final String sourceZkClusterKey = "--sourcezkcluster="; + if (cmd.startsWith(sourceZkClusterKey)) { + sourceZkCluster = cmd.substring(sourceZkClusterKey.length()); + continue; + } + + final String targetZkClusterKey = "--targetzkcluster="; + if (cmd.startsWith(targetZkClusterKey)) { + targetZkCluster = cmd.substring(targetZkClusterKey.length()); + continue; + } + + final String dryRunKey = "--dryrun="; + if (cmd.startsWith(dryRunKey)) { + dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length())); + continue; + } + + printUsage("Invalid argument '" + cmd + "'"); + return false; + } + + + } catch (Exception e) { + e.printStackTrace(); + printUsage("Can't start because " + e.getMessage()); + return false; + } + return true; + } + + /** + * Main entry point. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + if (!doCommandLine(otherArgs)) { + return 1; + } + + Job job = createSubmittableJob(otherArgs); + if (!job.waitForCompletion(true)) { + LOG.info("Map-reduce job failed!"); + return 1; + } + counters = job.getCounters(); + return 0; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java new file mode 100644 index 00000000000..762f5301634 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.MapFile; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +/** + * Basic test for the HashTable M/R tool + */ +@Category(LargeTests.class) +public class TestHashTable { + + private static final Log LOG = LogFactory.getLog(TestHashTable.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void beforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testHashTable() throws Exception { + final String tableName = "testHashTable"; + final byte[] family = Bytes.toBytes("family"); + final byte[] column1 = Bytes.toBytes("c1"); + final byte[] column2 = Bytes.toBytes("c2"); + final byte[] column3 = Bytes.toBytes("c3"); + + int numRows = 100; + int numRegions = 10; + int numHashFiles = 3; + + byte[][] splitRows = new byte[numRegions-1][]; + for (int i = 1; i < numRegions; i++) { + splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions); + } + + long timestamp = 1430764183454L; + // put rows into the first table + HTable t1 = TEST_UTIL.createTable(TableName.valueOf(tableName), family, splitRows); + for (int i = 0; i < numRows; i++) { + Put p = new Put(Bytes.toBytes(i), timestamp); + p.addColumn(family, column1, column1); + p.addColumn(family, column2, column2); + p.addColumn(family, column3, column3); + t1.put(p); + } + t1.close(); + + HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); + + Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName); + + long batchSize = 300; + int code = hashTable.run(new String[] { + "--batchsize=" + batchSize, + "--numhashfiles=" + numHashFiles, + "--scanbatch=2", + tableName, + testDir.toString()}); + assertEquals("test job failed", 0, code); + + FileSystem fs = TEST_UTIL.getTestFileSystem(); + + HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); + assertEquals(tableName, tableHash.tableName); + assertEquals(batchSize, tableHash.batchSize); + assertEquals(numHashFiles, tableHash.numHashFiles); + assertEquals(numHashFiles - 1, tableHash.partitions.size()); + for (ImmutableBytesWritable bytes : tableHash.partitions) { + LOG.debug("partition: " + Bytes.toInt(bytes.get())); + } + + ImmutableMap expectedHashes + = ImmutableMap.builder() + .put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f"))) + .put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96"))) + .put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa"))) + .put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881"))) + .put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352"))) + .put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93"))) + .put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666"))) + .put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090"))) + .put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3"))) + .put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb"))) + .put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc"))) + .put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4"))) + .put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b"))) + .put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59"))) + .put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f"))) + .put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56"))) + .put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095"))) + .put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91"))) + .put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38"))) + .put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56"))) + .build(); + + Map actualHashes + = new HashMap(); + Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR); + for (int i = 0; i < numHashFiles; i++) { + Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i)); + + MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf()); + ImmutableBytesWritable key = new ImmutableBytesWritable(); + ImmutableBytesWritable hash = new ImmutableBytesWritable(); + while(reader.next(key, hash)) { + String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength()); + LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16)) + + " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength())); + + int intKey = -1; + if (key.getLength() > 0) { + intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength()); + } + if (actualHashes.containsKey(intKey)) { + Assert.fail("duplicate key in data files: " + intKey); + } + actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes())); + } + reader.close(); + } + + FileStatus[] files = fs.listStatus(testDir); + for (FileStatus file : files) { + LOG.debug("Output file: " + file.getPath()); + } + + files = fs.listStatus(dataDir); + for (FileStatus file : files) { + LOG.debug("Data file: " + file.getPath()); + } + + if (!expectedHashes.equals(actualHashes)) { + LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes)); + } + Assert.assertEquals(expectedHashes, actualHashes); + + TEST_UTIL.deleteTable(tableName); + TEST_UTIL.cleanupDataTestDirOnTestFS(); + } + + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java new file mode 100644 index 00000000000..a86270f7bb0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counters; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.base.Throwables; + +/** + * Basic test for the SyncTable M/R tool + */ +@Category(LargeTests.class) +public class TestSyncTable { + + private static final Log LOG = LogFactory.getLog(TestSyncTable.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void beforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniMapReduceCluster(); + TEST_UTIL.shutdownMiniCluster(); + } + + private static byte[][] generateSplits(int numRows, int numRegions) { + byte[][] splitRows = new byte[numRegions-1][]; + for (int i = 1; i < numRegions; i++) { + splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions); + } + return splitRows; + } + + @Test + public void testSyncTable() throws Exception { + String sourceTableName = "testSourceTable"; + String targetTableName = "testTargetTable"; + Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable"); + + writeTestData(sourceTableName, targetTableName); + hashSourceTable(sourceTableName, testDir); + Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir); + assertEqualTables(90, sourceTableName, targetTableName); + + assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); + assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); + + TEST_UTIL.deleteTable(sourceTableName); + TEST_UTIL.deleteTable(targetTableName); + TEST_UTIL.cleanupDataTestDirOnTestFS(); + } + + private void assertEqualTables(int expectedRows, String sourceTableName, String targetTableName) + throws Exception { + Table sourceTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(sourceTableName)); + Table targetTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(targetTableName)); + + ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); + ResultScanner targetScanner = targetTable.getScanner(new Scan()); + + for (int i = 0; i < expectedRows; i++) { + Result sourceRow = sourceScanner.next(); + Result targetRow = targetScanner.next(); + + LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) + + " cells:" + sourceRow); + LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) + + " cells:" + targetRow); + + if (sourceRow == null) { + Assert.fail("Expected " + expectedRows + + " source rows but only found " + i); + } + if (targetRow == null) { + Assert.fail("Expected " + expectedRows + + " target rows but only found " + i); + } + Cell[] sourceCells = sourceRow.rawCells(); + Cell[] targetCells = targetRow.rawCells(); + if (sourceCells.length != targetCells.length) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + + " has " + sourceCells.length + + " cells in source table but " + targetCells.length + + " cells in target table"); + } + for (int j = 0; j < sourceCells.length; j++) { + Cell sourceCell = sourceCells[j]; + Cell targetCell = targetCells[j]; + try { + if (!CellUtil.matchingRow(sourceCell, targetCell)) { + Assert.fail("Rows don't match"); + } + if (!CellUtil.matchingFamily(sourceCell, targetCell)) { + Assert.fail("Families don't match"); + } + if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { + Assert.fail("Qualifiers don't match"); + } + if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { + Assert.fail("Timestamps don't match"); + } + if (!CellUtil.matchingValue(sourceCell, targetCell)) { + Assert.fail("Values don't match"); + } + } catch (Throwable t) { + LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); + Throwables.propagate(t); + } + } + } + Result sourceRow = sourceScanner.next(); + if (sourceRow != null) { + Assert.fail("Source table has more than " + expectedRows + + " rows. Next row: " + Bytes.toInt(sourceRow.getRow())); + } + Result targetRow = targetScanner.next(); + if (targetRow != null) { + Assert.fail("Target table has more than " + expectedRows + + " rows. Next row: " + Bytes.toInt(targetRow.getRow())); + } + sourceScanner.close(); + targetScanner.close(); + sourceTable.close(); + targetTable.close(); + } + + private Counters syncTables(String sourceTableName, String targetTableName, + Path testDir) throws Exception { + SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration()); + int code = syncTable.run(new String[] { + testDir.toString(), + sourceTableName, + targetTableName + }); + assertEquals("sync table job failed", 0, code); + + LOG.info("Sync tables completed"); + return syncTable.counters; + } + + private void hashSourceTable(String sourceTableName, Path testDir) + throws Exception, IOException { + int numHashFiles = 3; + long batchSize = 100; // should be 2 batches per region + int scanBatch = 1; + HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); + int code = hashTable.run(new String[] { + "--batchsize=" + batchSize, + "--numhashfiles=" + numHashFiles, + "--scanbatch=" + scanBatch, + sourceTableName, + testDir.toString()}); + assertEquals("hash table job failed", 0, code); + + FileSystem fs = TEST_UTIL.getTestFileSystem(); + + HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); + assertEquals(sourceTableName, tableHash.tableName); + assertEquals(batchSize, tableHash.batchSize); + assertEquals(numHashFiles, tableHash.numHashFiles); + assertEquals(numHashFiles - 1, tableHash.partitions.size()); + + LOG.info("Hash table completed"); + } + + private void writeTestData(String sourceTableName, String targetTableName) + throws Exception { + final byte[] family = Bytes.toBytes("family"); + final byte[] column1 = Bytes.toBytes("c1"); + final byte[] column2 = Bytes.toBytes("c2"); + final byte[] value1 = Bytes.toBytes("val1"); + final byte[] value2 = Bytes.toBytes("val2"); + final byte[] value3 = Bytes.toBytes("val3"); + + int numRows = 100; + int sourceRegions = 10; + int targetRegions = 6; + + HTable sourceTable = TEST_UTIL.createTable(TableName.valueOf(sourceTableName), + family, generateSplits(numRows, sourceRegions)); + + HTable targetTable = TEST_UTIL.createTable(TableName.valueOf(targetTableName), + family, generateSplits(numRows, targetRegions)); + + long timestamp = 1430764183454L; + + int rowIndex = 0; + // a bunch of identical rows + for (; rowIndex < 40; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamp, value1); + sourcePut.addColumn(family, column2, timestamp, value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamp, value1); + targetPut.addColumn(family, column2, timestamp, value2); + targetTable.put(targetPut); + } + // some rows only in the source table + // ROWSWITHDIFFS: 10 + // TARGETMISSINGROWS: 10 + // TARGETMISSINGCELLS: 20 + for (; rowIndex < 50; rowIndex++) { + Put put = new Put(Bytes.toBytes(rowIndex)); + put.addColumn(family, column1, timestamp, value1); + put.addColumn(family, column2, timestamp, value2); + sourceTable.put(put); + } + // some rows only in the target table + // ROWSWITHDIFFS: 10 + // SOURCEMISSINGROWS: 10 + // SOURCEMISSINGCELLS: 20 + for (; rowIndex < 60; rowIndex++) { + Put put = new Put(Bytes.toBytes(rowIndex)); + put.addColumn(family, column1, timestamp, value1); + put.addColumn(family, column2, timestamp, value2); + targetTable.put(put); + } + // some rows with 1 missing cell in target table + // ROWSWITHDIFFS: 10 + // TARGETMISSINGCELLS: 10 + for (; rowIndex < 70; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamp, value1); + sourcePut.addColumn(family, column2, timestamp, value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamp, value1); + targetTable.put(targetPut); + } + // some rows with 1 missing cell in source table + // ROWSWITHDIFFS: 10 + // SOURCEMISSINGCELLS: 10 + for (; rowIndex < 80; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamp, value1); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamp, value1); + targetPut.addColumn(family, column2, timestamp, value2); + targetTable.put(targetPut); + } + // some rows differing only in timestamp + // ROWSWITHDIFFS: 10 + // SOURCEMISSINGCELLS: 20 + // TARGETMISSINGCELLS: 20 + for (; rowIndex < 90; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamp, column1); + sourcePut.addColumn(family, column2, timestamp, value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamp+1, column1); + targetPut.addColumn(family, column2, timestamp-1, value2); + targetTable.put(targetPut); + } + // some rows with different values + // ROWSWITHDIFFS: 10 + // DIFFERENTCELLVALUES: 20 + for (; rowIndex < numRows; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamp, value1); + sourcePut.addColumn(family, column2, timestamp, value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamp, value3); + targetPut.addColumn(family, column2, timestamp, value3); + targetTable.put(targetPut); + } + + sourceTable.close(); + targetTable.close(); + } + + +}