HBASE-5604 M/R tool to replay WAL files
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1325555 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fc9faf8585
commit
0717d13416
|
@ -148,7 +148,24 @@
|
||||||
This page currently exists on the website and will eventually be migrated into the RefGuide.
|
This page currently exists on the website and will eventually be migrated into the RefGuide.
|
||||||
</para>
|
</para>
|
||||||
</section>
|
</section>
|
||||||
\ <section xml:id="rowcounter">
|
<section xml:id="walplayer">
|
||||||
|
<title>WALPlayer</title>
|
||||||
|
<para>WALPlayer is a utility to replay WAL files into HBase.
|
||||||
|
</para>
|
||||||
|
<para>The WAL can be replayed for a set of tables or all tables, and a timerange can be provided (in milliseconds). The WAL is filtered to this set of tables. The output can optionally be mapped to another set of tables.
|
||||||
|
</para>
|
||||||
|
<para>WALPlayer can also generate HFiles for later bulk importing, in that case only a single table and no mapping can be specified.
|
||||||
|
</para>
|
||||||
|
<para>Invoke via:
|
||||||
|
<programlisting>$ bin/hbase org.apache.hadoop.hbase.mapreduce.WALPlayer [options] <wal inputdir> <tables> [<tableMappings>]>
|
||||||
|
</programlisting>
|
||||||
|
</para>
|
||||||
|
<para>For example:
|
||||||
|
<programlisting>$ bin/hbase org.apache.hadoop.hbase.mapreduce.WALPlayer /backuplogdir oldTable1,oldTable2 newTable1,newTable2
|
||||||
|
</programlisting>
|
||||||
|
</para>
|
||||||
|
</section>
|
||||||
|
<section xml:id="rowcounter">
|
||||||
<title>RowCounter</title>
|
<title>RowCounter</title>
|
||||||
<para>RowCounter is a utility that will count all the rows of a table. This is a good utility to use
|
<para>RowCounter is a utility that will count all the rows of a table. This is a good utility to use
|
||||||
as a sanity check to ensure that HBase can read all the blocks of a table if there are any concerns of metadata inconsistency.
|
as a sanity check to ensure that HBase can read all the blocks of a table if there are any concerns of metadata inconsistency.
|
||||||
|
|
|
@ -0,0 +1,268 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.EOFException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.mapreduce.InputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple {@link InputFormat} for {@link HLog} files.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
public class HLogInputFormat extends InputFormat<HLogKey, WALEdit> {
|
||||||
|
private static Log LOG = LogFactory.getLog(HLogInputFormat.class);
|
||||||
|
|
||||||
|
public static String START_TIME_KEY = "hlog.start.time";
|
||||||
|
public static String END_TIME_KEY = "hlog.end.time";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link InputSplit} for {@link HLog} files. Each split represent
|
||||||
|
* exactly one log file.
|
||||||
|
*/
|
||||||
|
static class HLogSplit extends InputSplit implements Writable {
|
||||||
|
private String logFileName;
|
||||||
|
private long fileSize;
|
||||||
|
private long startTime;
|
||||||
|
private long endTime;
|
||||||
|
|
||||||
|
/** for serialization */
|
||||||
|
public HLogSplit() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represent an HLogSplit, i.e. a single HLog file.
|
||||||
|
* Start- and EndTime are managed by the split, so that HLog files can be
|
||||||
|
* filtered before WALEdits are passed to the mapper(s).
|
||||||
|
* @param logFileName
|
||||||
|
* @param fileSize
|
||||||
|
* @param startTime
|
||||||
|
* @param endTime
|
||||||
|
*/
|
||||||
|
public HLogSplit(String logFileName, long fileSize, long startTime, long endTime) {
|
||||||
|
this.logFileName = logFileName;
|
||||||
|
this.fileSize = fileSize;
|
||||||
|
this.startTime = startTime;
|
||||||
|
this.endTime = endTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLength() throws IOException, InterruptedException {
|
||||||
|
return fileSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getLocations() throws IOException, InterruptedException {
|
||||||
|
// TODO: Find the data node with the most blocks for this HLog?
|
||||||
|
return new String[] {};
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLogFileName() {
|
||||||
|
return logFileName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getStartTime() {
|
||||||
|
return startTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEndTime() {
|
||||||
|
return endTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
logFileName = in.readUTF();
|
||||||
|
fileSize = in.readLong();
|
||||||
|
startTime = in.readLong();
|
||||||
|
endTime = in.readLong();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
out.writeUTF(logFileName);
|
||||||
|
out.writeLong(fileSize);
|
||||||
|
out.writeLong(startTime);
|
||||||
|
out.writeLong(endTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link RecordReader} for an {@link HLog} file.
|
||||||
|
*/
|
||||||
|
static class HLogRecordReader extends RecordReader<HLogKey, WALEdit> {
|
||||||
|
private HLog.Reader reader = null;
|
||||||
|
private HLog.Entry currentEntry = new HLog.Entry();
|
||||||
|
private long startTime;
|
||||||
|
private long endTime;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(InputSplit split, TaskAttemptContext context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
HLogSplit hsplit = (HLogSplit)split;
|
||||||
|
Path logFile = new Path(hsplit.getLogFileName());
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
LOG.info("Opening reader for "+split);
|
||||||
|
try {
|
||||||
|
this.reader = HLog.getReader(logFile.getFileSystem(conf), logFile, conf);
|
||||||
|
} catch (EOFException x) {
|
||||||
|
LOG.info("Ignoring corrupted HLog file: " + logFile
|
||||||
|
+ " (This is normal when a RegionServer crashed.)");
|
||||||
|
}
|
||||||
|
this.startTime = hsplit.getStartTime();
|
||||||
|
this.endTime = hsplit.getEndTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean nextKeyValue() throws IOException, InterruptedException {
|
||||||
|
if (reader == null) return false;
|
||||||
|
|
||||||
|
HLog.Entry temp;
|
||||||
|
long i = -1;
|
||||||
|
do {
|
||||||
|
// skip older entries
|
||||||
|
try {
|
||||||
|
temp = reader.next(currentEntry);
|
||||||
|
i++;
|
||||||
|
} catch (EOFException x) {
|
||||||
|
LOG.info("Corrupted entry detected. Ignoring the rest of the file."
|
||||||
|
+ " (This is normal when a RegionServer crashed.)");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while(temp != null && temp.getKey().getWriteTime() < startTime);
|
||||||
|
|
||||||
|
if (temp == null) {
|
||||||
|
if (i > 0) LOG.info("Skipped " + i + " entries.");
|
||||||
|
LOG.info("Reached end of file.");
|
||||||
|
return false;
|
||||||
|
} else if (i > 0) {
|
||||||
|
LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
|
||||||
|
}
|
||||||
|
boolean res = temp.getKey().getWriteTime() <= endTime;
|
||||||
|
if (!res) {
|
||||||
|
LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HLogKey getCurrentKey() throws IOException, InterruptedException {
|
||||||
|
return currentEntry.getKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WALEdit getCurrentValue() throws IOException, InterruptedException {
|
||||||
|
return currentEntry.getEdit();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() throws IOException, InterruptedException {
|
||||||
|
// N/A depends on total number of entries, which is unknown
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
LOG.info("Closing reader");
|
||||||
|
if (reader != null) this.reader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<InputSplit> getSplits(JobContext context) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
Configuration conf = context.getConfiguration();
|
||||||
|
Path inputDir = new Path(conf.get("mapred.input.dir"));
|
||||||
|
|
||||||
|
long startTime = conf.getLong(START_TIME_KEY, Long.MIN_VALUE);
|
||||||
|
long endTime = conf.getLong(END_TIME_KEY, Long.MAX_VALUE);
|
||||||
|
|
||||||
|
FileSystem fs = inputDir.getFileSystem(conf);
|
||||||
|
List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime);
|
||||||
|
|
||||||
|
List<InputSplit> splits = new ArrayList<InputSplit>(files.size());
|
||||||
|
for (FileStatus file : files) {
|
||||||
|
splits.add(new HLogSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
|
||||||
|
}
|
||||||
|
return splits;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
|
||||||
|
throws IOException {
|
||||||
|
List<FileStatus> result = new ArrayList<FileStatus>();
|
||||||
|
LOG.debug("Scanning " + dir.toString() + " for HLog files");
|
||||||
|
|
||||||
|
FileStatus[] files = fs.listStatus(dir);
|
||||||
|
if (files == null) return Collections.emptyList();
|
||||||
|
for (FileStatus file : files) {
|
||||||
|
if (file.isDir()) {
|
||||||
|
// recurse into sub directories
|
||||||
|
result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
|
||||||
|
} else {
|
||||||
|
String name = file.getPath().toString();
|
||||||
|
int idx = name.lastIndexOf('.');
|
||||||
|
if (idx > 0) {
|
||||||
|
try {
|
||||||
|
long fileStartTime = Long.parseLong(name.substring(idx+1));
|
||||||
|
if (fileStartTime <= endTime) {
|
||||||
|
LOG.info("Found: " + name);
|
||||||
|
result.add(file);
|
||||||
|
}
|
||||||
|
} catch (NumberFormatException x) {
|
||||||
|
idx = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (idx == 0) {
|
||||||
|
LOG.warn("File " + name + " does not appear to be an HLog file. Skipping...");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordReader<HLogKey, WALEdit> createRecordReader(InputSplit split,
|
||||||
|
TaskAttemptContext context) throws IOException, InterruptedException {
|
||||||
|
return new HLogRecordReader();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,309 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A tool to replay WAL files as a M/R job.
|
||||||
|
* The WAL can be replayed for a set of tables or all tables,
|
||||||
|
* and a timerange can be provided (in milliseconds).
|
||||||
|
* The WAL is filtered to the passed set of tables and the output
|
||||||
|
* can optionally be mapped to another set of tables.
|
||||||
|
*
|
||||||
|
* WAL replay can also generate HFiles for later bulk importing,
|
||||||
|
* in that case the WAL is replayed for a single table only.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Stable
|
||||||
|
public class WALPlayer extends Configured implements Tool {
|
||||||
|
final static String NAME = "WALPlayer";
|
||||||
|
final static String BULK_OUTPUT_CONF_KEY = "hlog.bulk.output";
|
||||||
|
final static String HLOG_INPUT_KEY = "hlog.input.dir";
|
||||||
|
final static String TABLES_KEY = "hlog.input.tables";
|
||||||
|
final static String TABLE_MAP_KEY = "hlog.input.tablesmap";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A mapper that just writes out KeyValues.
|
||||||
|
* This one can be used together with {@link KeyValueSortReducer}
|
||||||
|
*/
|
||||||
|
static class HLogKeyValueMapper
|
||||||
|
extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue> {
|
||||||
|
private byte[] table;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void map(HLogKey key, WALEdit value,
|
||||||
|
Context context)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
// skip all other tables
|
||||||
|
if (Bytes.equals(table, key.getTablename())) {
|
||||||
|
for (KeyValue kv : value.getKeyValues()) {
|
||||||
|
if (HLog.isMetaFamily(kv.getFamily())) continue;
|
||||||
|
context.write(new ImmutableBytesWritable(kv.getRow()), kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup(Context context) throws IOException {
|
||||||
|
// only a single table is supported when HFiles are generated with HFileOutputFormat
|
||||||
|
String tables[] = context.getConfiguration().getStrings(TABLES_KEY);
|
||||||
|
if (tables == null || tables.length != 1) {
|
||||||
|
// this can only happen when HLogMapper is used directly by a class other than WALPlayer
|
||||||
|
throw new IOException("Exactly one table must be specified for bulk HFile case.");
|
||||||
|
}
|
||||||
|
table = Bytes.toBytes(tables[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A mapper that writes out {@link Mutation} to be directly applied to
|
||||||
|
* a running HBase instance.
|
||||||
|
*/
|
||||||
|
static class HLogMapper
|
||||||
|
extends Mapper<HLogKey, WALEdit, ImmutableBytesWritable, Mutation> {
|
||||||
|
private Map<byte[], byte[]> tables = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void map(HLogKey key, WALEdit value,
|
||||||
|
Context context)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
|
||||||
|
byte[] targetTable = tables.isEmpty() ?
|
||||||
|
key.getTablename() :
|
||||||
|
tables.get(key.getTablename());
|
||||||
|
ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable);
|
||||||
|
Put put = null;
|
||||||
|
Delete del = null;
|
||||||
|
KeyValue lastKV = null;
|
||||||
|
for (KeyValue kv : value.getKeyValues()) {
|
||||||
|
// filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
|
||||||
|
if (HLog.isMetaFamily(kv.getFamily())) continue;
|
||||||
|
|
||||||
|
// A WALEdit may contain multiple operations (HBASE-3584) and/or
|
||||||
|
// multiple rows (HBASE-5229).
|
||||||
|
// Aggregate as much as possible into a single Put/Delete
|
||||||
|
// operation before writing to the context.
|
||||||
|
if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
|
||||||
|
// row or type changed, write out aggregate KVs.
|
||||||
|
if (put != null) context.write(tableOut, put);
|
||||||
|
if (del != null) context.write(tableOut, del);
|
||||||
|
|
||||||
|
if (kv.isDelete()) {
|
||||||
|
del = new Delete(kv.getRow());
|
||||||
|
} else {
|
||||||
|
put = new Put(kv.getRow());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (kv.isDelete()) {
|
||||||
|
del.addDeleteMarker(kv);
|
||||||
|
} else {
|
||||||
|
put.add(kv);
|
||||||
|
}
|
||||||
|
lastKV = kv;
|
||||||
|
}
|
||||||
|
// write residual KVs
|
||||||
|
if (put != null) context.write(tableOut, put);
|
||||||
|
if (del != null) context.write(tableOut, del);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup(Context context) throws IOException {
|
||||||
|
String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
|
||||||
|
String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
|
||||||
|
if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
|
||||||
|
// this can only happen when HLogMapper is used directly by a class other than WALPlayer
|
||||||
|
throw new IOException("No tables or incorrect table mapping specified.");
|
||||||
|
}
|
||||||
|
int i = 0;
|
||||||
|
for (String table : tablesToUse) {
|
||||||
|
tables.put(Bytes.toBytes(table), Bytes.toBytes(tableMap[i++]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param conf The {@link Configuration} to use.
|
||||||
|
*/
|
||||||
|
public WALPlayer(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setupTime(Configuration conf, String option) throws IOException {
|
||||||
|
String val = conf.get(option);
|
||||||
|
if (val == null) return;
|
||||||
|
long ms;
|
||||||
|
try {
|
||||||
|
// first try to parse in user friendly form
|
||||||
|
ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
|
||||||
|
} catch (ParseException pe) {
|
||||||
|
try {
|
||||||
|
// then see if just a number of ms's was specified
|
||||||
|
ms = Long.parseLong(val);
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
throw new IOException(option
|
||||||
|
+ " must be specified either in the form 2001-02-20T16:35:06.99 "
|
||||||
|
+ "or as number of milliseconds");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
conf.setLong(option, ms);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets up the actual job.
|
||||||
|
*
|
||||||
|
* @param args The command line parameters.
|
||||||
|
* @return The newly created job.
|
||||||
|
* @throws IOException When setting up the job fails.
|
||||||
|
*/
|
||||||
|
public Job createSubmittableJob(String[] args)
|
||||||
|
throws IOException {
|
||||||
|
Configuration conf = getConf();
|
||||||
|
setupTime(conf, HLogInputFormat.START_TIME_KEY);
|
||||||
|
setupTime(conf, HLogInputFormat.END_TIME_KEY);
|
||||||
|
Path inputDir = new Path(args[0]);
|
||||||
|
String[] tables = args[1].split(",");
|
||||||
|
String[] tableMap;
|
||||||
|
if (args.length > 2) {
|
||||||
|
tableMap = args[2].split(",");
|
||||||
|
if (tableMap.length != tables.length) {
|
||||||
|
throw new IOException("The same number of tables and mapping must be provided.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// if not mapping is specified map each table to itself
|
||||||
|
tableMap = tables;
|
||||||
|
}
|
||||||
|
conf.setStrings(TABLES_KEY, tables);
|
||||||
|
conf.setStrings(TABLE_MAP_KEY, tableMap);
|
||||||
|
Job job = new Job(conf, NAME + "_" + inputDir);
|
||||||
|
job.setJarByClass(WALPlayer.class);
|
||||||
|
FileInputFormat.setInputPaths(job, inputDir);
|
||||||
|
job.setInputFormatClass(HLogInputFormat.class);
|
||||||
|
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||||
|
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
|
||||||
|
if (hfileOutPath != null) {
|
||||||
|
// the bulk HFile case
|
||||||
|
if (tables.length != 1) {
|
||||||
|
throw new IOException("Exactly one table must be specified for the bulk export option");
|
||||||
|
}
|
||||||
|
HTable table = new HTable(conf, tables[0]);
|
||||||
|
job.setMapperClass(HLogKeyValueMapper.class);
|
||||||
|
job.setReducerClass(KeyValueSortReducer.class);
|
||||||
|
Path outputDir = new Path(hfileOutPath);
|
||||||
|
FileOutputFormat.setOutputPath(job, outputDir);
|
||||||
|
job.setMapOutputValueClass(KeyValue.class);
|
||||||
|
HFileOutputFormat.configureIncrementalLoad(job, table);
|
||||||
|
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
|
||||||
|
com.google.common.base.Preconditions.class);
|
||||||
|
} else {
|
||||||
|
// output to live cluster
|
||||||
|
job.setMapperClass(HLogMapper.class);
|
||||||
|
job.setOutputFormatClass(MultiTableOutputFormat.class);
|
||||||
|
TableMapReduceUtil.addDependencyJars(job);
|
||||||
|
// No reducers.
|
||||||
|
job.setNumReduceTasks(0);
|
||||||
|
}
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @param errorMsg Error message. Can be null.
|
||||||
|
*/
|
||||||
|
private void usage(final String errorMsg) {
|
||||||
|
if (errorMsg != null && errorMsg.length() > 0) {
|
||||||
|
System.err.println("ERROR: " + errorMsg);
|
||||||
|
}
|
||||||
|
System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
|
||||||
|
System.err.println("Read all WAL entries for <tables>.");
|
||||||
|
System.err.println("If no tables (\"\") are specific, all tables are imported.");
|
||||||
|
System.err.println("(Careful, even -ROOT- and .META. entries will be imported in that case.)");
|
||||||
|
System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
|
||||||
|
System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
|
||||||
|
System.err.println("<tableMapping> is a command separated list of targettables.");
|
||||||
|
System.err.println("If specified, each table in <tables> must have a mapping.\n");
|
||||||
|
System.err.println("By default " + NAME + " will load data directly into HBase.");
|
||||||
|
System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
|
||||||
|
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
|
||||||
|
System.err.println(" (Only one table can be specified, and no mapping is allowed!)");
|
||||||
|
System.err.println("Other options: (specify time range to WAL edit to consider)");
|
||||||
|
System.err.println(" -D" + HLogInputFormat.START_TIME_KEY + "=[date|ms]");
|
||||||
|
System.err.println(" -D" + HLogInputFormat.END_TIME_KEY + "=[date|ms]");
|
||||||
|
System.err.println("For performance also consider the following options:\n"
|
||||||
|
+ " -Dmapred.map.tasks.speculative.execution=false\n"
|
||||||
|
+ " -Dmapred.reduce.tasks.speculative.execution=false");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Main entry point.
|
||||||
|
*
|
||||||
|
* @param args The command line parameters.
|
||||||
|
* @throws Exception When running the job fails.
|
||||||
|
*/
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
|
||||||
|
System.exit(ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
|
||||||
|
if (otherArgs.length < 2) {
|
||||||
|
usage("Wrong number of arguments: " + otherArgs.length);
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
Job job = createSubmittableJob(otherArgs);
|
||||||
|
return job.waitForCompletion(true) ? 0 : 1;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,240 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JUnit tests for the HLogRecordReader
|
||||||
|
*/
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestHLogRecordReader {
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static Configuration conf;
|
||||||
|
private static FileSystem fs;
|
||||||
|
private static Path hbaseDir;
|
||||||
|
private static final byte [] tableName = Bytes.toBytes(getName());
|
||||||
|
private static final byte [] rowName = tableName;
|
||||||
|
private static final HRegionInfo info = new HRegionInfo(tableName,
|
||||||
|
Bytes.toBytes(""), Bytes.toBytes(""), false);
|
||||||
|
private static final byte [] family = Bytes.toBytes("column");
|
||||||
|
private static final byte [] value = Bytes.toBytes("value");
|
||||||
|
private static HTableDescriptor htd;
|
||||||
|
private static Path logDir;
|
||||||
|
private static Path oldLogDir;
|
||||||
|
|
||||||
|
private static String getName() {
|
||||||
|
return "TestHLogRecordReader";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
FileStatus[] entries = fs.listStatus(hbaseDir);
|
||||||
|
for (FileStatus dir : entries) {
|
||||||
|
fs.delete(dir.getPath(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
// Make block sizes small.
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setInt("dfs.blocksize", 1024 * 1024);
|
||||||
|
conf.setInt("dfs.replication", 1);
|
||||||
|
TEST_UTIL.startMiniDFSCluster(1);
|
||||||
|
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||||
|
|
||||||
|
hbaseDir = TEST_UTIL.createRootDir();
|
||||||
|
logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
|
||||||
|
oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
|
htd = new HTableDescriptor(tableName);
|
||||||
|
htd.addFamily(new HColumnDescriptor(family));
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test partial reads from the log based on passed time range
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPartialRead() throws Exception {
|
||||||
|
HLog log = new HLog(fs, logDir, oldLogDir, conf);
|
||||||
|
long ts = System.currentTimeMillis();
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
|
||||||
|
ts, value));
|
||||||
|
log.append(info, tableName, edit,
|
||||||
|
ts, htd);
|
||||||
|
edit = new WALEdit();
|
||||||
|
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
|
||||||
|
ts+1, value));
|
||||||
|
log.append(info, tableName, edit,
|
||||||
|
ts+1, htd);
|
||||||
|
log.rollWriter();
|
||||||
|
|
||||||
|
Thread.sleep(1);
|
||||||
|
long ts1 = System.currentTimeMillis();
|
||||||
|
|
||||||
|
edit = new WALEdit();
|
||||||
|
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"),
|
||||||
|
ts1+1, value));
|
||||||
|
log.append(info, tableName, edit,
|
||||||
|
ts1+1, htd);
|
||||||
|
edit = new WALEdit();
|
||||||
|
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"),
|
||||||
|
ts1+2, value));
|
||||||
|
log.append(info, tableName, edit,
|
||||||
|
ts1+2, htd);
|
||||||
|
log.close();
|
||||||
|
|
||||||
|
HLogInputFormat input = new HLogInputFormat();
|
||||||
|
Configuration jobConf = new Configuration(conf);
|
||||||
|
jobConf.set("mapred.input.dir", logDir.toString());
|
||||||
|
jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts);
|
||||||
|
|
||||||
|
// only 1st file is considered, and only its 1st entry is used
|
||||||
|
List<InputSplit> splits = input.getSplits(new JobContext(jobConf, new JobID()));
|
||||||
|
assertEquals(1, splits.size());
|
||||||
|
testSplit(splits.get(0), Bytes.toBytes("1"));
|
||||||
|
|
||||||
|
jobConf.setLong(HLogInputFormat.START_TIME_KEY, ts+1);
|
||||||
|
jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts1+1);
|
||||||
|
splits = input.getSplits(new JobContext(jobConf, new JobID()));
|
||||||
|
// both files need to be considered
|
||||||
|
assertEquals(2, splits.size());
|
||||||
|
// only the 2nd entry from the 1st file is used
|
||||||
|
testSplit(splits.get(0), Bytes.toBytes("2"));
|
||||||
|
// only the 1nd entry from the 2nd file is used
|
||||||
|
testSplit(splits.get(1), Bytes.toBytes("3"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test basic functionality
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testHLogRecordReader() throws Exception {
|
||||||
|
HLog log = new HLog(fs, logDir, oldLogDir, conf);
|
||||||
|
byte [] value = Bytes.toBytes("value");
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
|
||||||
|
System.currentTimeMillis(), value));
|
||||||
|
log.append(info, tableName, edit,
|
||||||
|
System.currentTimeMillis(), htd);
|
||||||
|
|
||||||
|
Thread.sleep(1); // make sure 2nd log gets a later timestamp
|
||||||
|
long secondTs = System.currentTimeMillis();
|
||||||
|
log.rollWriter();
|
||||||
|
|
||||||
|
edit = new WALEdit();
|
||||||
|
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
|
||||||
|
System.currentTimeMillis(), value));
|
||||||
|
log.append(info, tableName, edit,
|
||||||
|
System.currentTimeMillis(), htd);
|
||||||
|
log.close();
|
||||||
|
long thirdTs = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// should have 2 log files now
|
||||||
|
HLogInputFormat input = new HLogInputFormat();
|
||||||
|
Configuration jobConf = new Configuration(conf);
|
||||||
|
jobConf.set("mapred.input.dir", logDir.toString());
|
||||||
|
|
||||||
|
// make sure both logs are found
|
||||||
|
List<InputSplit> splits = input.getSplits(new JobContext(jobConf, new JobID()));
|
||||||
|
assertEquals(2, splits.size());
|
||||||
|
|
||||||
|
// should return exactly one KV
|
||||||
|
testSplit(splits.get(0), Bytes.toBytes("1"));
|
||||||
|
// same for the 2nd split
|
||||||
|
testSplit(splits.get(1), Bytes.toBytes("2"));
|
||||||
|
|
||||||
|
// now test basic time ranges:
|
||||||
|
|
||||||
|
// set an endtime, the 2nd log file can be ignored completely.
|
||||||
|
jobConf.setLong(HLogInputFormat.END_TIME_KEY, secondTs-1);
|
||||||
|
splits = input.getSplits(new JobContext(jobConf, new JobID()));
|
||||||
|
assertEquals(1, splits.size());
|
||||||
|
testSplit(splits.get(0), Bytes.toBytes("1"));
|
||||||
|
|
||||||
|
// now set a start time
|
||||||
|
jobConf.setLong(HLogInputFormat.END_TIME_KEY, Long.MAX_VALUE);
|
||||||
|
jobConf.setLong(HLogInputFormat.START_TIME_KEY, thirdTs);
|
||||||
|
splits = input.getSplits(new JobContext(jobConf, new JobID()));
|
||||||
|
// both logs need to be considered
|
||||||
|
assertEquals(2, splits.size());
|
||||||
|
// but both readers skip all edits
|
||||||
|
testSplit(splits.get(0));
|
||||||
|
testSplit(splits.get(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new reader from the split, and match the edits against the passed columns.
|
||||||
|
*/
|
||||||
|
private void testSplit(InputSplit split, byte[]... columns) throws Exception {
|
||||||
|
HLogRecordReader reader = new HLogRecordReader();
|
||||||
|
reader.initialize(split, new TaskAttemptContext(conf, new TaskAttemptID()));
|
||||||
|
|
||||||
|
for (byte[] column : columns) {
|
||||||
|
assertTrue(reader.nextKeyValue());
|
||||||
|
assertTrue(Bytes
|
||||||
|
.equals(column, reader.getCurrentValue().getKeyValues().get(0).getQualifier()));
|
||||||
|
}
|
||||||
|
assertFalse(reader.nextKeyValue());
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@org.junit.Rule
|
||||||
|
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||||
|
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||||
|
}
|
|
@ -0,0 +1,121 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Basic test for the WALPlayer M/R tool
|
||||||
|
*/
|
||||||
|
public class TestWALPlayer {
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static MiniHBaseCluster cluster;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void beforeClass() throws Exception {
|
||||||
|
cluster = TEST_UTIL.startMiniCluster();
|
||||||
|
TEST_UTIL.startMiniMapReduceCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniMapReduceCluster();
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple end-to-end test
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWALPlayer() throws Exception {
|
||||||
|
final byte[] TABLENAME1 = Bytes.toBytes("testWALPlayer1");
|
||||||
|
final byte[] TABLENAME2 = Bytes.toBytes("testWALPlayer2");
|
||||||
|
final byte[] FAMILY = Bytes.toBytes("family");
|
||||||
|
final byte[] COLUMN1 = Bytes.toBytes("c1");
|
||||||
|
final byte[] COLUMN2 = Bytes.toBytes("c2");
|
||||||
|
final byte[] ROW = Bytes.toBytes("row");
|
||||||
|
HTable t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
|
||||||
|
HTable t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
|
||||||
|
|
||||||
|
// put a row into the first table
|
||||||
|
Put p = new Put(ROW);
|
||||||
|
p.add(FAMILY, COLUMN1, COLUMN1);
|
||||||
|
p.add(FAMILY, COLUMN2, COLUMN2);
|
||||||
|
t1.put(p);
|
||||||
|
// delete one column
|
||||||
|
Delete d = new Delete(ROW);
|
||||||
|
d.deleteColumns(FAMILY, COLUMN1);
|
||||||
|
t1.delete(d);
|
||||||
|
|
||||||
|
// replay the WAL, map table 1 to table 2
|
||||||
|
HLog log = cluster.getRegionServer(0).getWAL();
|
||||||
|
log.rollWriter();
|
||||||
|
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
|
||||||
|
.getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
|
||||||
|
|
||||||
|
WALPlayer player = new WALPlayer(TEST_UTIL.getConfiguration());
|
||||||
|
assertEquals(0, player.run(new String[] { walInputDir, Bytes.toString(TABLENAME1),
|
||||||
|
Bytes.toString(TABLENAME2) }));
|
||||||
|
|
||||||
|
// verify the WAL was player into table 2
|
||||||
|
Get g = new Get(ROW);
|
||||||
|
Result r = t2.get(g);
|
||||||
|
assertEquals(1, r.size());
|
||||||
|
assertTrue(Bytes.equals(COLUMN2, r.raw()[0].getQualifier()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple test for data parsing
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTimeFormat() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
WALPlayer player = new WALPlayer(TEST_UTIL.getConfiguration());
|
||||||
|
player.setupTime(conf, HLogInputFormat.END_TIME_KEY);
|
||||||
|
// make sure if nothing is specified nothing is set
|
||||||
|
assertNull(conf.get(HLogInputFormat.END_TIME_KEY));
|
||||||
|
// test a textual data (including ms)
|
||||||
|
conf.set(HLogInputFormat.END_TIME_KEY, "2012-4-10T14:21:01.01");
|
||||||
|
player.setupTime(conf, HLogInputFormat.END_TIME_KEY);
|
||||||
|
assertEquals(1334092861001L, conf.getLong(HLogInputFormat.END_TIME_KEY, 0));
|
||||||
|
// test with mss as a long
|
||||||
|
conf.set(HLogInputFormat.END_TIME_KEY, "1334092861010");
|
||||||
|
player.setupTime(conf, HLogInputFormat.END_TIME_KEY);
|
||||||
|
assertEquals(1334092861010L, conf.getLong(HLogInputFormat.END_TIME_KEY, 0));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue