HBASE-13639 SyncTable - rsync for HBase tables
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
47a9ff5d1f
commit
293506c7cd
|
@ -2590,13 +2590,46 @@ public class Bytes implements Comparable<Bytes> {
|
|||
return result;
|
||||
}
|
||||
|
||||
private static final char[] HEX_CHARS = {
|
||||
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
|
||||
};
|
||||
|
||||
/**
|
||||
* Convert a byte range into a hex string
|
||||
*/
|
||||
public static String toHex(byte[] b, int offset, int length) {
|
||||
checkArgument(length <= Integer.MAX_VALUE / 2);
|
||||
int numChars = length * 2;
|
||||
char[] ch = new char[numChars];
|
||||
for (int i = 0; i < numChars; i += 2)
|
||||
{
|
||||
byte d = b[offset + i/2];
|
||||
ch[i] = HEX_CHARS[(d >> 4) & 0x0F];
|
||||
ch[i+1] = HEX_CHARS[d & 0x0F];
|
||||
}
|
||||
return new String(ch);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a byte array into a hex string
|
||||
* @param b
|
||||
*/
|
||||
public static String toHex(byte[] b) {
|
||||
checkArgument(b.length > 0, "length must be greater than 0");
|
||||
return String.format("%x", new BigInteger(1, b));
|
||||
return toHex(b, 0, b.length);
|
||||
}
|
||||
|
||||
private static int hexCharToNibble(char ch) {
|
||||
if (ch <= '9' && ch >= '0') {
|
||||
return ch - '0';
|
||||
} else if (ch >= 'a' && ch <= 'f') {
|
||||
return ch - 'a' + 10;
|
||||
} else if (ch >= 'A' && ch <= 'F') {
|
||||
return ch - 'A' + 10;
|
||||
}
|
||||
throw new IllegalArgumentException("Invalid hex char: " + ch);
|
||||
}
|
||||
|
||||
private static byte hexCharsToByte(char c1, char c2) {
|
||||
return (byte) ((hexCharToNibble(c1) << 4) | hexCharToNibble(c2));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2605,14 +2638,11 @@ public class Bytes implements Comparable<Bytes> {
|
|||
* @param hex
|
||||
*/
|
||||
public static byte[] fromHex(String hex) {
|
||||
checkArgument(hex.length() > 0, "length must be greater than 0");
|
||||
checkArgument(hex.length() % 2 == 0, "length must be a multiple of 2");
|
||||
// Make sure letters are upper case
|
||||
hex = hex.toUpperCase();
|
||||
byte[] b = new byte[hex.length() / 2];
|
||||
for (int i = 0; i < b.length; i++) {
|
||||
b[i] = (byte)((toBinaryFromHex((byte)hex.charAt(2 * i)) << 4) +
|
||||
toBinaryFromHex((byte)hex.charAt((2 * i + 1))));
|
||||
int len = hex.length();
|
||||
byte[] b = new byte[len / 2];
|
||||
for (int i = 0; i < len; i += 2) {
|
||||
b[i / 2] = hexCharsToByte(hex.charAt(i),hex.charAt(i+1));
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,9 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
@ -494,5 +496,51 @@ public class TestBytes extends TestCase {
|
|||
Assert.assertEquals(i, b[i]);
|
||||
}
|
||||
}
|
||||
|
||||
public void testToFromHex() {
|
||||
List<String> testStrings = new ArrayList<String>();
|
||||
testStrings.addAll(Arrays.asList(new String[] {
|
||||
"",
|
||||
"00",
|
||||
"A0",
|
||||
"ff",
|
||||
"FFffFFFFFFFFFF",
|
||||
"12",
|
||||
"0123456789abcdef",
|
||||
"283462839463924623984692834692346ABCDFEDDCA0",
|
||||
}));
|
||||
for (String testString : testStrings)
|
||||
{
|
||||
byte[] byteData = Bytes.fromHex(testString);
|
||||
Assert.assertEquals(testString.length() / 2, byteData.length);
|
||||
String result = Bytes.toHex(byteData);
|
||||
Assert.assertTrue(testString.equalsIgnoreCase(result));
|
||||
}
|
||||
|
||||
List<byte[]> testByteData = new ArrayList<byte[]>();
|
||||
testByteData.addAll(Arrays.asList(new byte[][] {
|
||||
new byte[0],
|
||||
new byte[1],
|
||||
new byte[10],
|
||||
new byte[] {1, 2, 3, 4, 5},
|
||||
new byte[] {(byte) 0xFF},
|
||||
}));
|
||||
Random r = new Random();
|
||||
for (int i = 0; i < 20; i++)
|
||||
{
|
||||
|
||||
byte[] bytes = new byte[r.nextInt(100)];
|
||||
r.nextBytes(bytes);
|
||||
testByteData.add(bytes);
|
||||
}
|
||||
|
||||
for (byte[] testData : testByteData)
|
||||
{
|
||||
String hexString = Bytes.toHex(testData);
|
||||
Assert.assertEquals(testData.length * 2, hexString.length());
|
||||
byte[] result = Bytes.fromHex(hexString);
|
||||
Assert.assertArrayEquals(testData, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,747 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Ordering;
|
||||
|
||||
public class HashTable extends Configured implements Tool {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HashTable.class);
|
||||
|
||||
private static final int DEFAULT_BATCH_SIZE = 8000;
|
||||
|
||||
private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
|
||||
final static String PARTITIONS_FILE_NAME = "partitions";
|
||||
final static String MANIFEST_FILE_NAME = "manifest";
|
||||
final static String HASH_DATA_DIR = "hashes";
|
||||
final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
|
||||
private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
|
||||
|
||||
TableHash tableHash = new TableHash();
|
||||
Path destPath;
|
||||
|
||||
public HashTable(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
public static class TableHash {
|
||||
|
||||
Path hashDir;
|
||||
|
||||
String tableName;
|
||||
String families = null;
|
||||
long batchSize = DEFAULT_BATCH_SIZE;
|
||||
int numHashFiles = 0;
|
||||
byte[] startRow = HConstants.EMPTY_START_ROW;
|
||||
byte[] stopRow = HConstants.EMPTY_END_ROW;
|
||||
int scanBatch = 0;
|
||||
int versions = -1;
|
||||
long startTime = 0;
|
||||
long endTime = 0;
|
||||
|
||||
List<ImmutableBytesWritable> partitions;
|
||||
|
||||
public static TableHash read(Configuration conf, Path hashDir) throws IOException {
|
||||
TableHash tableHash = new TableHash();
|
||||
FileSystem fs = hashDir.getFileSystem(conf);
|
||||
tableHash.hashDir = hashDir;
|
||||
tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
|
||||
tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
|
||||
return tableHash;
|
||||
}
|
||||
|
||||
void writePropertiesFile(FileSystem fs, Path path) throws IOException {
|
||||
Properties p = new Properties();
|
||||
p.setProperty("table", tableName);
|
||||
if (families != null) {
|
||||
p.setProperty("columnFamilies", families);
|
||||
}
|
||||
p.setProperty("targetBatchSize", Long.toString(batchSize));
|
||||
p.setProperty("numHashFiles", Integer.toString(numHashFiles));
|
||||
if (!isTableStartRow(startRow)) {
|
||||
p.setProperty("startRowHex", Bytes.toHex(startRow));
|
||||
}
|
||||
if (!isTableEndRow(stopRow)) {
|
||||
p.setProperty("stopRowHex", Bytes.toHex(stopRow));
|
||||
}
|
||||
if (scanBatch > 0) {
|
||||
p.setProperty("scanBatch", Integer.toString(scanBatch));
|
||||
}
|
||||
if (versions >= 0) {
|
||||
p.setProperty("versions", Integer.toString(versions));
|
||||
}
|
||||
if (startTime != 0) {
|
||||
p.setProperty("startTimestamp", Long.toString(startTime));
|
||||
}
|
||||
if (endTime != 0) {
|
||||
p.setProperty("endTimestamp", Long.toString(endTime));
|
||||
}
|
||||
|
||||
FSDataOutputStream out = fs.create(path);
|
||||
p.store(new OutputStreamWriter(out, Charsets.UTF_8), null);
|
||||
out.close();
|
||||
}
|
||||
|
||||
void readPropertiesFile(FileSystem fs, Path path) throws IOException {
|
||||
FSDataInputStream in = fs.open(path);
|
||||
Properties p = new Properties();
|
||||
p.load(new InputStreamReader(in, Charsets.UTF_8));
|
||||
in.close();
|
||||
|
||||
tableName = p.getProperty("table");
|
||||
families = p.getProperty("columnFamilies");
|
||||
batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
|
||||
numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
|
||||
|
||||
String startRowHex = p.getProperty("startRowHex");
|
||||
if (startRowHex != null) {
|
||||
startRow = Bytes.fromHex(startRowHex);
|
||||
}
|
||||
String stopRowHex = p.getProperty("stopRowHex");
|
||||
if (stopRowHex != null) {
|
||||
stopRow = Bytes.fromHex(stopRowHex);
|
||||
}
|
||||
|
||||
String scanBatchString = p.getProperty("scanBatch");
|
||||
if (scanBatchString != null) {
|
||||
scanBatch = Integer.parseInt(scanBatchString);
|
||||
}
|
||||
|
||||
String versionString = p.getProperty("versions");
|
||||
if (versionString != null) {
|
||||
versions = Integer.parseInt(versionString);
|
||||
}
|
||||
|
||||
String startTimeString = p.getProperty("startTimestamp");
|
||||
if (startTimeString != null) {
|
||||
startTime = Long.parseLong(startTimeString);
|
||||
}
|
||||
|
||||
String endTimeString = p.getProperty("endTimestamp");
|
||||
if (endTimeString != null) {
|
||||
endTime = Long.parseLong(endTimeString);
|
||||
}
|
||||
}
|
||||
|
||||
Scan initScan() throws IOException {
|
||||
Scan scan = new Scan();
|
||||
scan.setCacheBlocks(false);
|
||||
if (startTime != 0 || endTime != 0) {
|
||||
scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
|
||||
}
|
||||
if (scanBatch > 0) {
|
||||
scan.setBatch(scanBatch);
|
||||
}
|
||||
if (versions >= 0) {
|
||||
scan.setMaxVersions(versions);
|
||||
}
|
||||
if (!isTableStartRow(startRow)) {
|
||||
scan.setStartRow(startRow);
|
||||
}
|
||||
if (!isTableEndRow(stopRow)) {
|
||||
scan.setStopRow(stopRow);
|
||||
}
|
||||
if(families != null) {
|
||||
for(String fam : families.split(",")) {
|
||||
scan.addFamily(Bytes.toBytes(fam));
|
||||
}
|
||||
}
|
||||
return scan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Choose partitions between row ranges to hash to a single output file
|
||||
* Selects region boundaries that fall within the scan range, and groups them
|
||||
* into the desired number of partitions.
|
||||
*/
|
||||
void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
|
||||
List<byte[]> startKeys = new ArrayList<byte[]>();
|
||||
for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
|
||||
byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
|
||||
byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
|
||||
|
||||
// if scan begins after this region, or starts before this region, then drop this region
|
||||
// in other words:
|
||||
// IF (scan begins before the end of this region
|
||||
// AND scan ends before the start of this region)
|
||||
// THEN include this region
|
||||
if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
|
||||
|| Bytes.compareTo(startRow, regionEndKey) < 0)
|
||||
&& (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
|
||||
|| Bytes.compareTo(stopRow, regionStartKey) > 0)) {
|
||||
startKeys.add(regionStartKey);
|
||||
}
|
||||
}
|
||||
|
||||
int numRegions = startKeys.size();
|
||||
if (numHashFiles == 0) {
|
||||
numHashFiles = numRegions / 100;
|
||||
}
|
||||
if (numHashFiles == 0) {
|
||||
numHashFiles = 1;
|
||||
}
|
||||
if (numHashFiles > numRegions) {
|
||||
// can't partition within regions
|
||||
numHashFiles = numRegions;
|
||||
}
|
||||
|
||||
// choose a subset of start keys to group regions into ranges
|
||||
partitions = new ArrayList<ImmutableBytesWritable>(numHashFiles - 1);
|
||||
// skip the first start key as it is not a partition between ranges.
|
||||
for (long i = 1; i < numHashFiles; i++) {
|
||||
int splitIndex = (int) (numRegions * i / numHashFiles);
|
||||
partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
|
||||
}
|
||||
}
|
||||
|
||||
void writePartitionFile(Configuration conf, Path path) throws IOException {
|
||||
FileSystem fs = path.getFileSystem(conf);
|
||||
@SuppressWarnings("deprecation")
|
||||
SequenceFile.Writer writer = SequenceFile.createWriter(
|
||||
fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
|
||||
|
||||
for (int i = 0; i < partitions.size(); i++) {
|
||||
writer.append(partitions.get(i), NullWritable.get());
|
||||
}
|
||||
writer.close();
|
||||
}
|
||||
|
||||
private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
|
||||
throws IOException {
|
||||
@SuppressWarnings("deprecation")
|
||||
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
|
||||
ImmutableBytesWritable key = new ImmutableBytesWritable();
|
||||
partitions = new ArrayList<ImmutableBytesWritable>();
|
||||
while (reader.next(key)) {
|
||||
partitions.add(new ImmutableBytesWritable(key.copyBytes()));
|
||||
}
|
||||
reader.close();
|
||||
|
||||
if (!Ordering.natural().isOrdered(partitions)) {
|
||||
throw new IOException("Partitions are not ordered!");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("tableName=").append(tableName);
|
||||
if (families != null) {
|
||||
sb.append(", families=").append(families);
|
||||
}
|
||||
sb.append(", batchSize=").append(batchSize);
|
||||
sb.append(", numHashFiles=").append(numHashFiles);
|
||||
if (!isTableStartRow(startRow)) {
|
||||
sb.append(", startRowHex=").append(Bytes.toHex(startRow));
|
||||
}
|
||||
if (!isTableEndRow(stopRow)) {
|
||||
sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
|
||||
}
|
||||
if (scanBatch >= 0) {
|
||||
sb.append(", scanBatch=").append(scanBatch);
|
||||
}
|
||||
if (versions >= 0) {
|
||||
sb.append(", versions=").append(versions);
|
||||
}
|
||||
if (startTime != 0) {
|
||||
sb.append("startTime=").append(startTime);
|
||||
}
|
||||
if (endTime != 0) {
|
||||
sb.append("endTime=").append(endTime);
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
static String getDataFileName(int hashFileIndex) {
|
||||
return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a TableHash.Reader starting at the first hash at or after the given key.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
|
||||
throws IOException {
|
||||
return new Reader(conf, startKey);
|
||||
}
|
||||
|
||||
public class Reader implements java.io.Closeable {
|
||||
private final Configuration conf;
|
||||
|
||||
private int hashFileIndex;
|
||||
private MapFile.Reader mapFileReader;
|
||||
|
||||
private boolean cachedNext;
|
||||
private ImmutableBytesWritable key;
|
||||
private ImmutableBytesWritable hash;
|
||||
|
||||
Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
|
||||
this.conf = conf;
|
||||
int partitionIndex = Collections.binarySearch(partitions, startKey);
|
||||
if (partitionIndex >= 0) {
|
||||
// if the key is equal to a partition, then go the file after that partition
|
||||
hashFileIndex = partitionIndex+1;
|
||||
} else {
|
||||
// if the key is between partitions, then go to the file between those partitions
|
||||
hashFileIndex = -1-partitionIndex;
|
||||
}
|
||||
openHashFile();
|
||||
|
||||
// MapFile's don't make it easy to seek() so that the subsequent next() returns
|
||||
// the desired key/value pair. So we cache it for the first call of next().
|
||||
hash = new ImmutableBytesWritable();
|
||||
key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
|
||||
if (key == null) {
|
||||
cachedNext = false;
|
||||
hash = null;
|
||||
} else {
|
||||
cachedNext = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the next key/hash pair.
|
||||
* Returns true if such a pair exists and false when at the end of the data.
|
||||
*/
|
||||
public boolean next() throws IOException {
|
||||
if (cachedNext) {
|
||||
cachedNext = false;
|
||||
return true;
|
||||
}
|
||||
key = new ImmutableBytesWritable();
|
||||
hash = new ImmutableBytesWritable();
|
||||
while (true) {
|
||||
boolean hasNext = mapFileReader.next(key, hash);
|
||||
if (hasNext) {
|
||||
return true;
|
||||
}
|
||||
hashFileIndex++;
|
||||
if (hashFileIndex < TableHash.this.numHashFiles) {
|
||||
mapFileReader.close();
|
||||
openHashFile();
|
||||
} else {
|
||||
key = null;
|
||||
hash = null;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current key
|
||||
* @return the current key or null if there is no current key
|
||||
*/
|
||||
public ImmutableBytesWritable getCurrentKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current hash
|
||||
* @return the current hash or null if there is no current hash
|
||||
*/
|
||||
public ImmutableBytesWritable getCurrentHash() {
|
||||
return hash;
|
||||
}
|
||||
|
||||
private void openHashFile() throws IOException {
|
||||
if (mapFileReader != null) {
|
||||
mapFileReader.close();
|
||||
}
|
||||
Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
|
||||
Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
|
||||
mapFileReader = new MapFile.Reader(dataFile, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
mapFileReader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static boolean isTableStartRow(byte[] row) {
|
||||
return Bytes.equals(HConstants.EMPTY_START_ROW, row);
|
||||
}
|
||||
|
||||
static boolean isTableEndRow(byte[] row) {
|
||||
return Bytes.equals(HConstants.EMPTY_END_ROW, row);
|
||||
}
|
||||
|
||||
public Job createSubmittableJob(String[] args) throws IOException {
|
||||
Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
|
||||
generatePartitions(partitionsPath);
|
||||
|
||||
Job job = Job.getInstance(getConf(),
|
||||
getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
|
||||
Configuration jobConf = job.getConfiguration();
|
||||
jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
|
||||
job.setJarByClass(HashTable.class);
|
||||
|
||||
TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
|
||||
HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
|
||||
|
||||
// use a TotalOrderPartitioner and reducers to group region output into hash files
|
||||
job.setPartitionerClass(TotalOrderPartitioner.class);
|
||||
TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
|
||||
job.setReducerClass(Reducer.class); // identity reducer
|
||||
job.setNumReduceTasks(tableHash.numHashFiles);
|
||||
job.setOutputKeyClass(ImmutableBytesWritable.class);
|
||||
job.setOutputValueClass(ImmutableBytesWritable.class);
|
||||
job.setOutputFormatClass(MapFileOutputFormat.class);
|
||||
FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
private void generatePartitions(Path partitionsPath) throws IOException {
|
||||
Connection connection = ConnectionFactory.createConnection(getConf());
|
||||
Pair<byte[][], byte[][]> regionKeys
|
||||
= connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
|
||||
connection.close();
|
||||
|
||||
tableHash.selectPartitions(regionKeys);
|
||||
LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
|
||||
|
||||
tableHash.writePartitionFile(getConf(), partitionsPath);
|
||||
}
|
||||
|
||||
static class ResultHasher {
|
||||
private MessageDigest digest;
|
||||
|
||||
private boolean batchStarted = false;
|
||||
private ImmutableBytesWritable batchStartKey;
|
||||
private ImmutableBytesWritable batchHash;
|
||||
private long batchSize = 0;
|
||||
|
||||
|
||||
public ResultHasher() {
|
||||
try {
|
||||
digest = MessageDigest.getInstance("MD5");
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void startBatch(ImmutableBytesWritable row) {
|
||||
if (batchStarted) {
|
||||
throw new RuntimeException("Cannot start new batch without finishing existing one.");
|
||||
}
|
||||
batchStarted = true;
|
||||
batchSize = 0;
|
||||
batchStartKey = row;
|
||||
batchHash = null;
|
||||
}
|
||||
|
||||
public void hashResult(Result result) {
|
||||
if (!batchStarted) {
|
||||
throw new RuntimeException("Cannot add to batch that has not been started.");
|
||||
}
|
||||
for (Cell cell : result.rawCells()) {
|
||||
int rowLength = cell.getRowLength();
|
||||
int familyLength = cell.getFamilyLength();
|
||||
int qualifierLength = cell.getQualifierLength();
|
||||
int valueLength = cell.getValueLength();
|
||||
digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
|
||||
digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
|
||||
digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
|
||||
long ts = cell.getTimestamp();
|
||||
for (int i = 8; i > 0; i--) {
|
||||
digest.update((byte) ts);
|
||||
ts >>>= 8;
|
||||
}
|
||||
digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
|
||||
|
||||
batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
|
||||
}
|
||||
}
|
||||
|
||||
public void finishBatch() {
|
||||
if (!batchStarted) {
|
||||
throw new RuntimeException("Cannot finish batch that has not started.");
|
||||
}
|
||||
batchStarted = false;
|
||||
batchHash = new ImmutableBytesWritable(digest.digest());
|
||||
}
|
||||
|
||||
public boolean isBatchStarted() {
|
||||
return batchStarted;
|
||||
}
|
||||
|
||||
public ImmutableBytesWritable getBatchStartKey() {
|
||||
return batchStartKey;
|
||||
}
|
||||
|
||||
public ImmutableBytesWritable getBatchHash() {
|
||||
return batchHash;
|
||||
}
|
||||
|
||||
public long getBatchSize() {
|
||||
return batchSize;
|
||||
}
|
||||
}
|
||||
|
||||
public static class HashMapper
|
||||
extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
|
||||
|
||||
private ResultHasher hasher;
|
||||
private long targetBatchSize;
|
||||
|
||||
private ImmutableBytesWritable currentRow;
|
||||
|
||||
@Override
|
||||
protected void setup(Context context) throws IOException, InterruptedException {
|
||||
targetBatchSize = context.getConfiguration()
|
||||
.getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
|
||||
hasher = new ResultHasher();
|
||||
|
||||
TableSplit split = (TableSplit) context.getInputSplit();
|
||||
hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void map(ImmutableBytesWritable key, Result value, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
if (currentRow == null || !currentRow.equals(key)) {
|
||||
currentRow = new ImmutableBytesWritable(key); // not immutable
|
||||
|
||||
if (hasher.getBatchSize() >= targetBatchSize) {
|
||||
hasher.finishBatch();
|
||||
context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
|
||||
hasher.startBatch(currentRow);
|
||||
}
|
||||
}
|
||||
|
||||
hasher.hashResult(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup(Context context) throws IOException, InterruptedException {
|
||||
hasher.finishBatch();
|
||||
context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
|
||||
}
|
||||
}
|
||||
|
||||
private void writeTempManifestFile() throws IOException {
|
||||
Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
|
||||
FileSystem fs = tempManifestPath.getFileSystem(getConf());
|
||||
tableHash.writePropertiesFile(fs, tempManifestPath);
|
||||
}
|
||||
|
||||
private void completeManifest() throws IOException {
|
||||
Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
|
||||
Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
|
||||
FileSystem fs = tempManifestPath.getFileSystem(getConf());
|
||||
fs.rename(tempManifestPath, manifestPath);
|
||||
}
|
||||
|
||||
private static final int NUM_ARGS = 2;
|
||||
private static void printUsage(final String errorMsg) {
|
||||
if (errorMsg != null && errorMsg.length() > 0) {
|
||||
System.err.println("ERROR: " + errorMsg);
|
||||
System.err.println();
|
||||
}
|
||||
System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
|
||||
System.err.println();
|
||||
System.err.println("Options:");
|
||||
System.err.println(" batchsize the target amount of bytes to hash in each batch");
|
||||
System.err.println(" rows are added to the batch until this size is reached");
|
||||
System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
|
||||
System.err.println(" numhashfiles the number of hash files to create");
|
||||
System.err.println(" if set to fewer than number of regions then");
|
||||
System.err.println(" the job will create this number of reducers");
|
||||
System.err.println(" (defaults to 1/100 of regions -- at least 1)");
|
||||
System.err.println(" startrow the start row");
|
||||
System.err.println(" stoprow the stop row");
|
||||
System.err.println(" starttime beginning of the time range (unixtime in millis)");
|
||||
System.err.println(" without endtime means from starttime to forever");
|
||||
System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
|
||||
System.err.println(" scanbatch scanner batch size to support intra row scans");
|
||||
System.err.println(" versions number of cell versions to include");
|
||||
System.err.println(" families comma-separated list of families to include");
|
||||
System.err.println();
|
||||
System.err.println("Args:");
|
||||
System.err.println(" tablename Name of the table to hash");
|
||||
System.err.println(" outputpath Filesystem path to put the output data");
|
||||
System.err.println();
|
||||
System.err.println("Examples:");
|
||||
System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
|
||||
System.err.println(" $ bin/hbase " +
|
||||
"org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
|
||||
+ " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
|
||||
+ " TestTable /hashes/testTable");
|
||||
}
|
||||
|
||||
private boolean doCommandLine(final String[] args) {
|
||||
if (args.length < NUM_ARGS) {
|
||||
printUsage(null);
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
|
||||
tableHash.tableName = args[args.length-2];
|
||||
destPath = new Path(args[args.length-1]);
|
||||
|
||||
for (int i = 0; i < args.length - NUM_ARGS; i++) {
|
||||
String cmd = args[i];
|
||||
if (cmd.equals("-h") || cmd.startsWith("--h")) {
|
||||
printUsage(null);
|
||||
return false;
|
||||
}
|
||||
|
||||
final String batchSizeArgKey = "--batchsize=";
|
||||
if (cmd.startsWith(batchSizeArgKey)) {
|
||||
tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
final String numHashFilesArgKey = "--numhashfiles=";
|
||||
if (cmd.startsWith(numHashFilesArgKey)) {
|
||||
tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
final String startRowArgKey = "--startrow=";
|
||||
if (cmd.startsWith(startRowArgKey)) {
|
||||
tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
final String stopRowArgKey = "--stoprow=";
|
||||
if (cmd.startsWith(stopRowArgKey)) {
|
||||
tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
final String startTimeArgKey = "--starttime=";
|
||||
if (cmd.startsWith(startTimeArgKey)) {
|
||||
tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
final String endTimeArgKey = "--endtime=";
|
||||
if (cmd.startsWith(endTimeArgKey)) {
|
||||
tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
final String scanBatchArgKey = "--scanbatch=";
|
||||
if (cmd.startsWith(scanBatchArgKey)) {
|
||||
tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
final String versionsArgKey = "--versions=";
|
||||
if (cmd.startsWith(versionsArgKey)) {
|
||||
tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
final String familiesArgKey = "--families=";
|
||||
if (cmd.startsWith(familiesArgKey)) {
|
||||
tableHash.families = cmd.substring(familiesArgKey.length());
|
||||
continue;
|
||||
}
|
||||
|
||||
printUsage("Invalid argument '" + cmd + "'");
|
||||
return false;
|
||||
}
|
||||
if ((tableHash.startTime != 0 || tableHash.endTime != 0)
|
||||
&& (tableHash.startTime >= tableHash.endTime)) {
|
||||
printUsage("Invalid time range filter: starttime="
|
||||
+ tableHash.startTime + " >= endtime=" + tableHash.endTime);
|
||||
return false;
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
printUsage("Can't start because " + e.getMessage());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Main entry point.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
|
||||
System.exit(ret);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
|
||||
if (!doCommandLine(otherArgs)) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
Job job = createSubmittableJob(otherArgs);
|
||||
writeTempManifestFile();
|
||||
if (!job.waitForCompletion(true)) {
|
||||
LOG.info("Map-reduce job failed!");
|
||||
return 1;
|
||||
}
|
||||
completeManifest();
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,773 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Iterators;
|
||||
|
||||
public class SyncTable extends Configured implements Tool {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(SyncTable.class);
|
||||
|
||||
static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
|
||||
static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
|
||||
static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
|
||||
static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
|
||||
static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
|
||||
static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
|
||||
|
||||
Path sourceHashDir;
|
||||
String sourceTableName;
|
||||
String targetTableName;
|
||||
|
||||
String sourceZkCluster;
|
||||
String targetZkCluster;
|
||||
boolean dryRun;
|
||||
|
||||
Counters counters;
|
||||
|
||||
public SyncTable(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
public Job createSubmittableJob(String[] args) throws IOException {
|
||||
FileSystem fs = sourceHashDir.getFileSystem(getConf());
|
||||
if (!fs.exists(sourceHashDir)) {
|
||||
throw new IOException("Source hash dir not found: " + sourceHashDir);
|
||||
}
|
||||
|
||||
HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
|
||||
LOG.info("Read source hash manifest: " + tableHash);
|
||||
LOG.info("Read " + tableHash.partitions.size() + " partition keys");
|
||||
if (!tableHash.tableName.equals(sourceTableName)) {
|
||||
LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
|
||||
+ tableHash.tableName + " but job is reading from: " + sourceTableName);
|
||||
}
|
||||
if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
|
||||
throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
|
||||
+ " should be 1 more than the number of partition keys. However, the manifest file "
|
||||
+ " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
|
||||
+ " found in the partitions file is " + tableHash.partitions.size());
|
||||
}
|
||||
|
||||
Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
|
||||
int dataSubdirCount = 0;
|
||||
for (FileStatus file : fs.listStatus(dataDir)) {
|
||||
if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
|
||||
dataSubdirCount++;
|
||||
}
|
||||
}
|
||||
|
||||
if (dataSubdirCount != tableHash.numHashFiles) {
|
||||
throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
|
||||
+ " should be 1 more than the number of partition keys. However, the number of data dirs"
|
||||
+ " found is " + dataSubdirCount + " but the number of partition keys"
|
||||
+ " found in the partitions file is " + tableHash.partitions.size());
|
||||
}
|
||||
|
||||
Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
|
||||
"syncTable_" + sourceTableName + "-" + targetTableName));
|
||||
Configuration jobConf = job.getConfiguration();
|
||||
job.setJarByClass(HashTable.class);
|
||||
jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
|
||||
jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
|
||||
jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
|
||||
if (sourceZkCluster != null) {
|
||||
jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
|
||||
}
|
||||
if (targetZkCluster != null) {
|
||||
jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
|
||||
}
|
||||
jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
|
||||
|
||||
TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
|
||||
SyncMapper.class, null, null, job);
|
||||
|
||||
job.setNumReduceTasks(0);
|
||||
|
||||
if (dryRun) {
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
} else {
|
||||
// No reducers. Just write straight to table. Call initTableReducerJob
|
||||
// because it sets up the TableOutputFormat.
|
||||
TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
|
||||
targetZkCluster, null, null);
|
||||
|
||||
// would be nice to add an option for bulk load instead
|
||||
}
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
|
||||
Path sourceHashDir;
|
||||
|
||||
Connection sourceConnection;
|
||||
Connection targetConnection;
|
||||
Table sourceTable;
|
||||
Table targetTable;
|
||||
boolean dryRun;
|
||||
|
||||
HashTable.TableHash sourceTableHash;
|
||||
HashTable.TableHash.Reader sourceHashReader;
|
||||
ImmutableBytesWritable currentSourceHash;
|
||||
ImmutableBytesWritable nextSourceKey;
|
||||
HashTable.ResultHasher targetHasher;
|
||||
|
||||
Throwable mapperException;
|
||||
|
||||
public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
|
||||
SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
|
||||
MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
|
||||
|
||||
@Override
|
||||
protected void setup(Context context) throws IOException {
|
||||
|
||||
Configuration conf = context.getConfiguration();
|
||||
sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
|
||||
sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY);
|
||||
targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY);
|
||||
sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
|
||||
targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
|
||||
dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
|
||||
|
||||
sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
|
||||
LOG.info("Read source hash manifest: " + sourceTableHash);
|
||||
LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
|
||||
|
||||
TableSplit split = (TableSplit) context.getInputSplit();
|
||||
ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
|
||||
|
||||
sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
|
||||
findNextKeyHashPair();
|
||||
|
||||
// create a hasher, but don't start it right away
|
||||
// instead, find the first hash batch at or after the start row
|
||||
// and skip any rows that come before. they will be caught by the previous task
|
||||
targetHasher = new HashTable.ResultHasher();
|
||||
}
|
||||
|
||||
private static Connection openConnection(Configuration conf, String zkClusterConfKey)
|
||||
throws IOException {
|
||||
Configuration clusterConf = new Configuration(conf);
|
||||
String zkCluster = conf.get(zkClusterConfKey);
|
||||
if (zkCluster != null) {
|
||||
ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster);
|
||||
}
|
||||
return ConnectionFactory.createConnection(clusterConf);
|
||||
}
|
||||
|
||||
private static Table openTable(Connection connection, Configuration conf,
|
||||
String tableNameConfKey) throws IOException {
|
||||
return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to read the next source key/hash pair.
|
||||
* If there are no more, set nextSourceKey to null
|
||||
*/
|
||||
private void findNextKeyHashPair() throws IOException {
|
||||
boolean hasNext = sourceHashReader.next();
|
||||
if (hasNext) {
|
||||
nextSourceKey = sourceHashReader.getCurrentKey();
|
||||
} else {
|
||||
// no more keys - last hash goes to the end
|
||||
nextSourceKey = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void map(ImmutableBytesWritable key, Result value, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
// first, finish any hash batches that end before the scanned row
|
||||
while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
|
||||
moveToNextBatch(context);
|
||||
}
|
||||
|
||||
// next, add the scanned row (as long as we've reached the first batch)
|
||||
if (targetHasher.isBatchStarted()) {
|
||||
targetHasher.hashResult(value);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
mapperException = t;
|
||||
Throwables.propagateIfInstanceOf(t, IOException.class);
|
||||
Throwables.propagateIfInstanceOf(t, InterruptedException.class);
|
||||
Throwables.propagate(t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If there is an open hash batch, complete it and sync if there are diffs.
|
||||
* Start a new batch, and seek to read the
|
||||
*/
|
||||
private void moveToNextBatch(Context context) throws IOException, InterruptedException {
|
||||
if (targetHasher.isBatchStarted()) {
|
||||
finishBatchAndCompareHashes(context);
|
||||
}
|
||||
targetHasher.startBatch(nextSourceKey);
|
||||
currentSourceHash = sourceHashReader.getCurrentHash();
|
||||
|
||||
findNextKeyHashPair();
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish the currently open hash batch.
|
||||
* Compare the target hash to the given source hash.
|
||||
* If they do not match, then sync the covered key range.
|
||||
*/
|
||||
private void finishBatchAndCompareHashes(Context context)
|
||||
throws IOException, InterruptedException {
|
||||
targetHasher.finishBatch();
|
||||
context.getCounter(Counter.BATCHES).increment(1);
|
||||
if (targetHasher.getBatchSize() == 0) {
|
||||
context.getCounter(Counter.EMPTY_BATCHES).increment(1);
|
||||
}
|
||||
ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
|
||||
if (targetHash.equals(currentSourceHash)) {
|
||||
context.getCounter(Counter.HASHES_MATCHED).increment(1);
|
||||
} else {
|
||||
context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
|
||||
|
||||
ImmutableBytesWritable stopRow = nextSourceKey == null
|
||||
? new ImmutableBytesWritable(sourceTableHash.stopRow)
|
||||
: nextSourceKey;
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey())
|
||||
+ " to " + toHex(stopRow)
|
||||
+ " sourceHash: " + toHex(currentSourceHash)
|
||||
+ " targetHash: " + toHex(targetHash));
|
||||
}
|
||||
|
||||
syncRange(context, targetHasher.getBatchStartKey(), stopRow);
|
||||
}
|
||||
}
|
||||
private static String toHex(ImmutableBytesWritable bytes) {
|
||||
return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
|
||||
}
|
||||
|
||||
private static final CellScanner EMPTY_CELL_SCANNER
|
||||
= new CellScanner(Iterators.<Result>emptyIterator());
|
||||
|
||||
/**
|
||||
* Rescan the given range directly from the source and target tables.
|
||||
* Count and log differences, and if this is not a dry run, output Puts and Deletes
|
||||
* to make the target table match the source table for this range
|
||||
*/
|
||||
private void syncRange(Context context, ImmutableBytesWritable startRow,
|
||||
ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
|
||||
|
||||
Scan scan = sourceTableHash.initScan();
|
||||
scan.setStartRow(startRow.copyBytes());
|
||||
scan.setStopRow(stopRow.copyBytes());
|
||||
|
||||
ResultScanner sourceScanner = sourceTable.getScanner(scan);
|
||||
CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
|
||||
|
||||
ResultScanner targetScanner = targetTable.getScanner(scan);
|
||||
CellScanner targetCells = new CellScanner(targetScanner.iterator());
|
||||
|
||||
boolean rangeMatched = true;
|
||||
byte[] nextSourceRow = sourceCells.nextRow();
|
||||
byte[] nextTargetRow = targetCells.nextRow();
|
||||
while(nextSourceRow != null || nextTargetRow != null) {
|
||||
boolean rowMatched;
|
||||
int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
|
||||
if (rowComparison < 0) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
|
||||
}
|
||||
context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
|
||||
|
||||
rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
|
||||
nextSourceRow = sourceCells.nextRow(); // advance only source to next row
|
||||
} else if (rowComparison > 0) {
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
|
||||
}
|
||||
context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
|
||||
|
||||
rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
|
||||
nextTargetRow = targetCells.nextRow(); // advance only target to next row
|
||||
} else {
|
||||
// current row is the same on both sides, compare cell by cell
|
||||
rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
|
||||
nextSourceRow = sourceCells.nextRow();
|
||||
nextTargetRow = targetCells.nextRow();
|
||||
}
|
||||
|
||||
if (!rowMatched) {
|
||||
rangeMatched = false;
|
||||
}
|
||||
}
|
||||
|
||||
sourceScanner.close();
|
||||
targetScanner.close();
|
||||
|
||||
context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
|
||||
.increment(1);
|
||||
}
|
||||
|
||||
private static class CellScanner {
|
||||
private final Iterator<Result> results;
|
||||
|
||||
private byte[] currentRow;
|
||||
private Result currentRowResult;
|
||||
private int nextCellInRow;
|
||||
|
||||
private Result nextRowResult;
|
||||
|
||||
public CellScanner(Iterator<Result> results) {
|
||||
this.results = results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Advance to the next row and return its row key.
|
||||
* Returns null iff there are no more rows.
|
||||
*/
|
||||
public byte[] nextRow() {
|
||||
if (nextRowResult == null) {
|
||||
// no cached row - check scanner for more
|
||||
while (results.hasNext()) {
|
||||
nextRowResult = results.next();
|
||||
Cell nextCell = nextRowResult.rawCells()[0];
|
||||
if (currentRow == null
|
||||
|| !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
|
||||
nextCell.getRowOffset(), nextCell.getRowLength())) {
|
||||
// found next row
|
||||
break;
|
||||
} else {
|
||||
// found another result from current row, keep scanning
|
||||
nextRowResult = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (nextRowResult == null) {
|
||||
// end of data, no more rows
|
||||
currentRowResult = null;
|
||||
currentRow = null;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// advance to cached result for next row
|
||||
currentRowResult = nextRowResult;
|
||||
nextCellInRow = 0;
|
||||
currentRow = currentRowResult.getRow();
|
||||
nextRowResult = null;
|
||||
return currentRow;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the next Cell in the current row or null iff none remain.
|
||||
*/
|
||||
public Cell nextCellInRow() {
|
||||
if (currentRowResult == null) {
|
||||
// nothing left in current row
|
||||
return null;
|
||||
}
|
||||
|
||||
Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
|
||||
nextCellInRow++;
|
||||
if (nextCellInRow == currentRowResult.size()) {
|
||||
if (results.hasNext()) {
|
||||
Result result = results.next();
|
||||
Cell cell = result.rawCells()[0];
|
||||
if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
|
||||
cell.getRowOffset(), cell.getRowLength())) {
|
||||
// result is part of current row
|
||||
currentRowResult = result;
|
||||
nextCellInRow = 0;
|
||||
} else {
|
||||
// result is part of next row, cache it
|
||||
nextRowResult = result;
|
||||
// current row is complete
|
||||
currentRowResult = null;
|
||||
}
|
||||
} else {
|
||||
// end of data
|
||||
currentRowResult = null;
|
||||
}
|
||||
}
|
||||
return nextCell;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare the cells for the given row from the source and target tables.
|
||||
* Count and log any differences.
|
||||
* If not a dry run, output a Put and/or Delete needed to sync the target table
|
||||
* to match the source table.
|
||||
*/
|
||||
private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
|
||||
CellScanner targetCells) throws IOException, InterruptedException {
|
||||
Put put = null;
|
||||
Delete delete = null;
|
||||
long matchingCells = 0;
|
||||
boolean matchingRow = true;
|
||||
Cell sourceCell = sourceCells.nextCellInRow();
|
||||
Cell targetCell = targetCells.nextCellInRow();
|
||||
while (sourceCell != null || targetCell != null) {
|
||||
|
||||
int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
|
||||
if (cellKeyComparison < 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Target missing cell: " + sourceCell);
|
||||
}
|
||||
context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
|
||||
matchingRow = false;
|
||||
|
||||
if (!dryRun) {
|
||||
if (put == null) {
|
||||
put = new Put(rowKey);
|
||||
}
|
||||
put.add(sourceCell);
|
||||
}
|
||||
|
||||
sourceCell = sourceCells.nextCellInRow();
|
||||
} else if (cellKeyComparison > 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Source missing cell: " + targetCell);
|
||||
}
|
||||
context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
|
||||
matchingRow = false;
|
||||
|
||||
if (!dryRun) {
|
||||
if (delete == null) {
|
||||
delete = new Delete(rowKey);
|
||||
}
|
||||
// add a tombstone to exactly match the target cell that is missing on the source
|
||||
delete.addColumn(CellUtil.cloneFamily(targetCell),
|
||||
CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
|
||||
}
|
||||
|
||||
targetCell = targetCells.nextCellInRow();
|
||||
} else {
|
||||
// the cell keys are equal, now check values
|
||||
if (CellUtil.matchingValue(sourceCell, targetCell)) {
|
||||
matchingCells++;
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Different values: ");
|
||||
LOG.debug(" source cell: " + sourceCell
|
||||
+ " value: " + Bytes.toHex(sourceCell.getValueArray(),
|
||||
sourceCell.getValueOffset(), sourceCell.getValueLength()));
|
||||
LOG.debug(" target cell: " + targetCell
|
||||
+ " value: " + Bytes.toHex(targetCell.getValueArray(),
|
||||
targetCell.getValueOffset(), targetCell.getValueLength()));
|
||||
}
|
||||
context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
|
||||
matchingRow = false;
|
||||
|
||||
if (!dryRun) {
|
||||
// overwrite target cell
|
||||
if (put == null) {
|
||||
put = new Put(rowKey);
|
||||
}
|
||||
put.add(sourceCell);
|
||||
}
|
||||
}
|
||||
sourceCell = sourceCells.nextCellInRow();
|
||||
targetCell = targetCells.nextCellInRow();
|
||||
}
|
||||
|
||||
if (!dryRun && sourceTableHash.scanBatch > 0) {
|
||||
if (put != null && put.size() >= sourceTableHash.scanBatch) {
|
||||
context.write(new ImmutableBytesWritable(rowKey), put);
|
||||
put = null;
|
||||
}
|
||||
if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
|
||||
context.write(new ImmutableBytesWritable(rowKey), delete);
|
||||
delete = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!dryRun) {
|
||||
if (put != null) {
|
||||
context.write(new ImmutableBytesWritable(rowKey), put);
|
||||
}
|
||||
if (delete != null) {
|
||||
context.write(new ImmutableBytesWritable(rowKey), delete);
|
||||
}
|
||||
}
|
||||
|
||||
if (matchingCells > 0) {
|
||||
context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
|
||||
}
|
||||
if (matchingRow) {
|
||||
context.getCounter(Counter.MATCHINGROWS).increment(1);
|
||||
return true;
|
||||
} else {
|
||||
context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static final CellComparator cellComparator = new CellComparator();
|
||||
/**
|
||||
* Compare row keys of the given Result objects.
|
||||
* Nulls are after non-nulls
|
||||
*/
|
||||
private static int compareRowKeys(byte[] r1, byte[] r2) {
|
||||
if (r1 == null) {
|
||||
return 1; // source missing row
|
||||
} else if (r2 == null) {
|
||||
return -1; // target missing row
|
||||
} else {
|
||||
return cellComparator.compareRows(r1, 0, r1.length, r2, 0, r2.length);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare families, qualifiers, and timestamps of the given Cells.
|
||||
* They are assumed to be of the same row.
|
||||
* Nulls are after non-nulls.
|
||||
*/
|
||||
private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
|
||||
if (c1 == null) {
|
||||
return 1; // source missing cell
|
||||
}
|
||||
if (c2 == null) {
|
||||
return -1; // target missing cell
|
||||
}
|
||||
|
||||
int result = CellComparator.compareFamilies(c1, c2);
|
||||
if (result != 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
result = CellComparator.compareQualifiers(c1, c2);
|
||||
if (result != 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
// note timestamp comparison is inverted - more recent cells first
|
||||
return CellComparator.compareTimestamps(c1, c2);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup(Context context)
|
||||
throws IOException, InterruptedException {
|
||||
if (mapperException == null) {
|
||||
try {
|
||||
finishRemainingHashRanges(context);
|
||||
} catch (Throwable t) {
|
||||
mapperException = t;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
sourceTable.close();
|
||||
targetTable.close();
|
||||
sourceConnection.close();
|
||||
targetConnection.close();
|
||||
} catch (Throwable t) {
|
||||
if (mapperException == null) {
|
||||
mapperException = t;
|
||||
} else {
|
||||
LOG.error("Suppressing exception from closing tables", t);
|
||||
}
|
||||
}
|
||||
|
||||
// propagate first exception
|
||||
if (mapperException != null) {
|
||||
Throwables.propagateIfInstanceOf(mapperException, IOException.class);
|
||||
Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
|
||||
Throwables.propagate(mapperException);
|
||||
}
|
||||
}
|
||||
|
||||
private void finishRemainingHashRanges(Context context) throws IOException,
|
||||
InterruptedException {
|
||||
TableSplit split = (TableSplit) context.getInputSplit();
|
||||
byte[] splitEndRow = split.getEndRow();
|
||||
boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
|
||||
|
||||
// if there are more hash batches that begin before the end of this split move to them
|
||||
while (nextSourceKey != null
|
||||
&& (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
|
||||
moveToNextBatch(context);
|
||||
}
|
||||
|
||||
if (targetHasher.isBatchStarted()) {
|
||||
// need to complete the final open hash batch
|
||||
|
||||
if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
|
||||
|| (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
|
||||
// the open hash range continues past the end of this region
|
||||
// add a scan to complete the current hash range
|
||||
Scan scan = sourceTableHash.initScan();
|
||||
scan.setStartRow(splitEndRow);
|
||||
if (nextSourceKey == null) {
|
||||
scan.setStopRow(sourceTableHash.stopRow);
|
||||
} else {
|
||||
scan.setStopRow(nextSourceKey.copyBytes());
|
||||
}
|
||||
|
||||
ResultScanner targetScanner = targetTable.getScanner(scan);
|
||||
for (Result row : targetScanner) {
|
||||
targetHasher.hashResult(row);
|
||||
}
|
||||
} // else current batch ends exactly at split end row
|
||||
|
||||
finishBatchAndCompareHashes(context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final int NUM_ARGS = 3;
|
||||
private static void printUsage(final String errorMsg) {
|
||||
if (errorMsg != null && errorMsg.length() > 0) {
|
||||
System.err.println("ERROR: " + errorMsg);
|
||||
System.err.println();
|
||||
}
|
||||
System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
|
||||
System.err.println();
|
||||
System.err.println("Options:");
|
||||
|
||||
System.err.println(" sourcezkcluster ZK cluster key of the source table");
|
||||
System.err.println(" (defaults to cluster in classpath's config)");
|
||||
System.err.println(" targetzkcluster ZK cluster key of the target table");
|
||||
System.err.println(" (defaults to cluster in classpath's config)");
|
||||
System.err.println(" dryrun if true, output counters but no writes");
|
||||
System.err.println(" (defaults to false)");
|
||||
System.err.println();
|
||||
System.err.println("Args:");
|
||||
System.err.println(" sourcehashdir path to HashTable output dir for source table");
|
||||
System.err.println(" if not specified, then all data will be scanned");
|
||||
System.err.println(" sourcetable Name of the source table to sync from");
|
||||
System.err.println(" targettable Name of the target table to sync to");
|
||||
System.err.println();
|
||||
System.err.println("Examples:");
|
||||
System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
|
||||
System.err.println(" to a local target cluster:");
|
||||
System.err.println(" $ bin/hbase " +
|
||||
"org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
|
||||
+ " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
|
||||
+ " hdfs://nn:9000/hashes/tableA tableA tableA");
|
||||
}
|
||||
|
||||
private boolean doCommandLine(final String[] args) {
|
||||
if (args.length < NUM_ARGS) {
|
||||
printUsage(null);
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
sourceHashDir = new Path(args[args.length - 3]);
|
||||
sourceTableName = args[args.length - 2];
|
||||
targetTableName = args[args.length - 1];
|
||||
|
||||
for (int i = 0; i < args.length - NUM_ARGS; i++) {
|
||||
String cmd = args[i];
|
||||
if (cmd.equals("-h") || cmd.startsWith("--h")) {
|
||||
printUsage(null);
|
||||
return false;
|
||||
}
|
||||
|
||||
final String sourceZkClusterKey = "--sourcezkcluster=";
|
||||
if (cmd.startsWith(sourceZkClusterKey)) {
|
||||
sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
|
||||
continue;
|
||||
}
|
||||
|
||||
final String targetZkClusterKey = "--targetzkcluster=";
|
||||
if (cmd.startsWith(targetZkClusterKey)) {
|
||||
targetZkCluster = cmd.substring(targetZkClusterKey.length());
|
||||
continue;
|
||||
}
|
||||
|
||||
final String dryRunKey = "--dryrun=";
|
||||
if (cmd.startsWith(dryRunKey)) {
|
||||
dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
|
||||
continue;
|
||||
}
|
||||
|
||||
printUsage("Invalid argument '" + cmd + "'");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
printUsage("Can't start because " + e.getMessage());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Main entry point.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
|
||||
System.exit(ret);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
|
||||
if (!doCommandLine(otherArgs)) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
Job job = createSubmittableJob(otherArgs);
|
||||
if (!job.waitForCompletion(true)) {
|
||||
LOG.info("Map-reduce job failed!");
|
||||
return 1;
|
||||
}
|
||||
counters = job.getCounters();
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,192 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* Basic test for the HashTable M/R tool
|
||||
*/
|
||||
@Category(LargeTests.class)
|
||||
public class TestHashTable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestHashTable.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
TEST_UTIL.startMiniMapReduceCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniMapReduceCluster();
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHashTable() throws Exception {
|
||||
final String tableName = "testHashTable";
|
||||
final byte[] family = Bytes.toBytes("family");
|
||||
final byte[] column1 = Bytes.toBytes("c1");
|
||||
final byte[] column2 = Bytes.toBytes("c2");
|
||||
final byte[] column3 = Bytes.toBytes("c3");
|
||||
|
||||
int numRows = 100;
|
||||
int numRegions = 10;
|
||||
int numHashFiles = 3;
|
||||
|
||||
byte[][] splitRows = new byte[numRegions-1][];
|
||||
for (int i = 1; i < numRegions; i++) {
|
||||
splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
|
||||
}
|
||||
|
||||
long timestamp = 1430764183454L;
|
||||
// put rows into the first table
|
||||
HTable t1 = TEST_UTIL.createTable(TableName.valueOf(tableName), family, splitRows);
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
Put p = new Put(Bytes.toBytes(i), timestamp);
|
||||
p.addColumn(family, column1, column1);
|
||||
p.addColumn(family, column2, column2);
|
||||
p.addColumn(family, column3, column3);
|
||||
t1.put(p);
|
||||
}
|
||||
t1.close();
|
||||
|
||||
HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
|
||||
|
||||
Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName);
|
||||
|
||||
long batchSize = 300;
|
||||
int code = hashTable.run(new String[] {
|
||||
"--batchsize=" + batchSize,
|
||||
"--numhashfiles=" + numHashFiles,
|
||||
"--scanbatch=2",
|
||||
tableName,
|
||||
testDir.toString()});
|
||||
assertEquals("test job failed", 0, code);
|
||||
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
|
||||
HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
|
||||
assertEquals(tableName, tableHash.tableName);
|
||||
assertEquals(batchSize, tableHash.batchSize);
|
||||
assertEquals(numHashFiles, tableHash.numHashFiles);
|
||||
assertEquals(numHashFiles - 1, tableHash.partitions.size());
|
||||
for (ImmutableBytesWritable bytes : tableHash.partitions) {
|
||||
LOG.debug("partition: " + Bytes.toInt(bytes.get()));
|
||||
}
|
||||
|
||||
ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes
|
||||
= ImmutableMap.<Integer, ImmutableBytesWritable>builder()
|
||||
.put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f")))
|
||||
.put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96")))
|
||||
.put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa")))
|
||||
.put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881")))
|
||||
.put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352")))
|
||||
.put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93")))
|
||||
.put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666")))
|
||||
.put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090")))
|
||||
.put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3")))
|
||||
.put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb")))
|
||||
.put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc")))
|
||||
.put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4")))
|
||||
.put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b")))
|
||||
.put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59")))
|
||||
.put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f")))
|
||||
.put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56")))
|
||||
.put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095")))
|
||||
.put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91")))
|
||||
.put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38")))
|
||||
.put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56")))
|
||||
.build();
|
||||
|
||||
Map<Integer, ImmutableBytesWritable> actualHashes
|
||||
= new HashMap<Integer, ImmutableBytesWritable>();
|
||||
Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR);
|
||||
for (int i = 0; i < numHashFiles; i++) {
|
||||
Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
|
||||
|
||||
MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf());
|
||||
ImmutableBytesWritable key = new ImmutableBytesWritable();
|
||||
ImmutableBytesWritable hash = new ImmutableBytesWritable();
|
||||
while(reader.next(key, hash)) {
|
||||
String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength());
|
||||
LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16))
|
||||
+ " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength()));
|
||||
|
||||
int intKey = -1;
|
||||
if (key.getLength() > 0) {
|
||||
intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength());
|
||||
}
|
||||
if (actualHashes.containsKey(intKey)) {
|
||||
Assert.fail("duplicate key in data files: " + intKey);
|
||||
}
|
||||
actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes()));
|
||||
}
|
||||
reader.close();
|
||||
}
|
||||
|
||||
FileStatus[] files = fs.listStatus(testDir);
|
||||
for (FileStatus file : files) {
|
||||
LOG.debug("Output file: " + file.getPath());
|
||||
}
|
||||
|
||||
files = fs.listStatus(dataDir);
|
||||
for (FileStatus file : files) {
|
||||
LOG.debug("Data file: " + file.getPath());
|
||||
}
|
||||
|
||||
if (!expectedHashes.equals(actualHashes)) {
|
||||
LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes));
|
||||
}
|
||||
Assert.assertEquals(expectedHashes, actualHashes);
|
||||
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
TEST_UTIL.cleanupDataTestDirOnTestFS();
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,334 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
|
||||
/**
|
||||
* Basic test for the SyncTable M/R tool
|
||||
*/
|
||||
@Category(LargeTests.class)
|
||||
public class TestSyncTable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestSyncTable.class);
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
TEST_UTIL.startMiniMapReduceCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniMapReduceCluster();
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private static byte[][] generateSplits(int numRows, int numRegions) {
|
||||
byte[][] splitRows = new byte[numRegions-1][];
|
||||
for (int i = 1; i < numRegions; i++) {
|
||||
splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
|
||||
}
|
||||
return splitRows;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSyncTable() throws Exception {
|
||||
String sourceTableName = "testSourceTable";
|
||||
String targetTableName = "testTargetTable";
|
||||
Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
|
||||
|
||||
writeTestData(sourceTableName, targetTableName);
|
||||
hashSourceTable(sourceTableName, testDir);
|
||||
Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
|
||||
assertEqualTables(90, sourceTableName, targetTableName);
|
||||
|
||||
assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
|
||||
assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
|
||||
assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
|
||||
assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
|
||||
assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
|
||||
assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
|
||||
|
||||
TEST_UTIL.deleteTable(sourceTableName);
|
||||
TEST_UTIL.deleteTable(targetTableName);
|
||||
TEST_UTIL.cleanupDataTestDirOnTestFS();
|
||||
}
|
||||
|
||||
private void assertEqualTables(int expectedRows, String sourceTableName, String targetTableName)
|
||||
throws Exception {
|
||||
Table sourceTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(sourceTableName));
|
||||
Table targetTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(targetTableName));
|
||||
|
||||
ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
|
||||
ResultScanner targetScanner = targetTable.getScanner(new Scan());
|
||||
|
||||
for (int i = 0; i < expectedRows; i++) {
|
||||
Result sourceRow = sourceScanner.next();
|
||||
Result targetRow = targetScanner.next();
|
||||
|
||||
LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
|
||||
+ " cells:" + sourceRow);
|
||||
LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
|
||||
+ " cells:" + targetRow);
|
||||
|
||||
if (sourceRow == null) {
|
||||
Assert.fail("Expected " + expectedRows
|
||||
+ " source rows but only found " + i);
|
||||
}
|
||||
if (targetRow == null) {
|
||||
Assert.fail("Expected " + expectedRows
|
||||
+ " target rows but only found " + i);
|
||||
}
|
||||
Cell[] sourceCells = sourceRow.rawCells();
|
||||
Cell[] targetCells = targetRow.rawCells();
|
||||
if (sourceCells.length != targetCells.length) {
|
||||
LOG.debug("Source cells: " + Arrays.toString(sourceCells));
|
||||
LOG.debug("Target cells: " + Arrays.toString(targetCells));
|
||||
Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
|
||||
+ " has " + sourceCells.length
|
||||
+ " cells in source table but " + targetCells.length
|
||||
+ " cells in target table");
|
||||
}
|
||||
for (int j = 0; j < sourceCells.length; j++) {
|
||||
Cell sourceCell = sourceCells[j];
|
||||
Cell targetCell = targetCells[j];
|
||||
try {
|
||||
if (!CellUtil.matchingRow(sourceCell, targetCell)) {
|
||||
Assert.fail("Rows don't match");
|
||||
}
|
||||
if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
|
||||
Assert.fail("Families don't match");
|
||||
}
|
||||
if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
|
||||
Assert.fail("Qualifiers don't match");
|
||||
}
|
||||
if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
|
||||
Assert.fail("Timestamps don't match");
|
||||
}
|
||||
if (!CellUtil.matchingValue(sourceCell, targetCell)) {
|
||||
Assert.fail("Values don't match");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
|
||||
Throwables.propagate(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
Result sourceRow = sourceScanner.next();
|
||||
if (sourceRow != null) {
|
||||
Assert.fail("Source table has more than " + expectedRows
|
||||
+ " rows. Next row: " + Bytes.toInt(sourceRow.getRow()));
|
||||
}
|
||||
Result targetRow = targetScanner.next();
|
||||
if (targetRow != null) {
|
||||
Assert.fail("Target table has more than " + expectedRows
|
||||
+ " rows. Next row: " + Bytes.toInt(targetRow.getRow()));
|
||||
}
|
||||
sourceScanner.close();
|
||||
targetScanner.close();
|
||||
sourceTable.close();
|
||||
targetTable.close();
|
||||
}
|
||||
|
||||
private Counters syncTables(String sourceTableName, String targetTableName,
|
||||
Path testDir) throws Exception {
|
||||
SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
|
||||
int code = syncTable.run(new String[] {
|
||||
testDir.toString(),
|
||||
sourceTableName,
|
||||
targetTableName
|
||||
});
|
||||
assertEquals("sync table job failed", 0, code);
|
||||
|
||||
LOG.info("Sync tables completed");
|
||||
return syncTable.counters;
|
||||
}
|
||||
|
||||
private void hashSourceTable(String sourceTableName, Path testDir)
|
||||
throws Exception, IOException {
|
||||
int numHashFiles = 3;
|
||||
long batchSize = 100; // should be 2 batches per region
|
||||
int scanBatch = 1;
|
||||
HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
|
||||
int code = hashTable.run(new String[] {
|
||||
"--batchsize=" + batchSize,
|
||||
"--numhashfiles=" + numHashFiles,
|
||||
"--scanbatch=" + scanBatch,
|
||||
sourceTableName,
|
||||
testDir.toString()});
|
||||
assertEquals("hash table job failed", 0, code);
|
||||
|
||||
FileSystem fs = TEST_UTIL.getTestFileSystem();
|
||||
|
||||
HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
|
||||
assertEquals(sourceTableName, tableHash.tableName);
|
||||
assertEquals(batchSize, tableHash.batchSize);
|
||||
assertEquals(numHashFiles, tableHash.numHashFiles);
|
||||
assertEquals(numHashFiles - 1, tableHash.partitions.size());
|
||||
|
||||
LOG.info("Hash table completed");
|
||||
}
|
||||
|
||||
private void writeTestData(String sourceTableName, String targetTableName)
|
||||
throws Exception {
|
||||
final byte[] family = Bytes.toBytes("family");
|
||||
final byte[] column1 = Bytes.toBytes("c1");
|
||||
final byte[] column2 = Bytes.toBytes("c2");
|
||||
final byte[] value1 = Bytes.toBytes("val1");
|
||||
final byte[] value2 = Bytes.toBytes("val2");
|
||||
final byte[] value3 = Bytes.toBytes("val3");
|
||||
|
||||
int numRows = 100;
|
||||
int sourceRegions = 10;
|
||||
int targetRegions = 6;
|
||||
|
||||
HTable sourceTable = TEST_UTIL.createTable(TableName.valueOf(sourceTableName),
|
||||
family, generateSplits(numRows, sourceRegions));
|
||||
|
||||
HTable targetTable = TEST_UTIL.createTable(TableName.valueOf(targetTableName),
|
||||
family, generateSplits(numRows, targetRegions));
|
||||
|
||||
long timestamp = 1430764183454L;
|
||||
|
||||
int rowIndex = 0;
|
||||
// a bunch of identical rows
|
||||
for (; rowIndex < 40; rowIndex++) {
|
||||
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
||||
sourcePut.addColumn(family, column1, timestamp, value1);
|
||||
sourcePut.addColumn(family, column2, timestamp, value2);
|
||||
sourceTable.put(sourcePut);
|
||||
|
||||
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
||||
targetPut.addColumn(family, column1, timestamp, value1);
|
||||
targetPut.addColumn(family, column2, timestamp, value2);
|
||||
targetTable.put(targetPut);
|
||||
}
|
||||
// some rows only in the source table
|
||||
// ROWSWITHDIFFS: 10
|
||||
// TARGETMISSINGROWS: 10
|
||||
// TARGETMISSINGCELLS: 20
|
||||
for (; rowIndex < 50; rowIndex++) {
|
||||
Put put = new Put(Bytes.toBytes(rowIndex));
|
||||
put.addColumn(family, column1, timestamp, value1);
|
||||
put.addColumn(family, column2, timestamp, value2);
|
||||
sourceTable.put(put);
|
||||
}
|
||||
// some rows only in the target table
|
||||
// ROWSWITHDIFFS: 10
|
||||
// SOURCEMISSINGROWS: 10
|
||||
// SOURCEMISSINGCELLS: 20
|
||||
for (; rowIndex < 60; rowIndex++) {
|
||||
Put put = new Put(Bytes.toBytes(rowIndex));
|
||||
put.addColumn(family, column1, timestamp, value1);
|
||||
put.addColumn(family, column2, timestamp, value2);
|
||||
targetTable.put(put);
|
||||
}
|
||||
// some rows with 1 missing cell in target table
|
||||
// ROWSWITHDIFFS: 10
|
||||
// TARGETMISSINGCELLS: 10
|
||||
for (; rowIndex < 70; rowIndex++) {
|
||||
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
||||
sourcePut.addColumn(family, column1, timestamp, value1);
|
||||
sourcePut.addColumn(family, column2, timestamp, value2);
|
||||
sourceTable.put(sourcePut);
|
||||
|
||||
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
||||
targetPut.addColumn(family, column1, timestamp, value1);
|
||||
targetTable.put(targetPut);
|
||||
}
|
||||
// some rows with 1 missing cell in source table
|
||||
// ROWSWITHDIFFS: 10
|
||||
// SOURCEMISSINGCELLS: 10
|
||||
for (; rowIndex < 80; rowIndex++) {
|
||||
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
||||
sourcePut.addColumn(family, column1, timestamp, value1);
|
||||
sourceTable.put(sourcePut);
|
||||
|
||||
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
||||
targetPut.addColumn(family, column1, timestamp, value1);
|
||||
targetPut.addColumn(family, column2, timestamp, value2);
|
||||
targetTable.put(targetPut);
|
||||
}
|
||||
// some rows differing only in timestamp
|
||||
// ROWSWITHDIFFS: 10
|
||||
// SOURCEMISSINGCELLS: 20
|
||||
// TARGETMISSINGCELLS: 20
|
||||
for (; rowIndex < 90; rowIndex++) {
|
||||
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
||||
sourcePut.addColumn(family, column1, timestamp, column1);
|
||||
sourcePut.addColumn(family, column2, timestamp, value2);
|
||||
sourceTable.put(sourcePut);
|
||||
|
||||
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
||||
targetPut.addColumn(family, column1, timestamp+1, column1);
|
||||
targetPut.addColumn(family, column2, timestamp-1, value2);
|
||||
targetTable.put(targetPut);
|
||||
}
|
||||
// some rows with different values
|
||||
// ROWSWITHDIFFS: 10
|
||||
// DIFFERENTCELLVALUES: 20
|
||||
for (; rowIndex < numRows; rowIndex++) {
|
||||
Put sourcePut = new Put(Bytes.toBytes(rowIndex));
|
||||
sourcePut.addColumn(family, column1, timestamp, value1);
|
||||
sourcePut.addColumn(family, column2, timestamp, value2);
|
||||
sourceTable.put(sourcePut);
|
||||
|
||||
Put targetPut = new Put(Bytes.toBytes(rowIndex));
|
||||
targetPut.addColumn(family, column1, timestamp, value3);
|
||||
targetPut.addColumn(family, column2, timestamp, value3);
|
||||
targetTable.put(targetPut);
|
||||
}
|
||||
|
||||
sourceTable.close();
|
||||
targetTable.close();
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue