HBASE-22976 [HBCK2] Add RecoveredEditsPlayer (#2504)

Make it so WALPlayer can replay recovered.edits files.

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
 Allow for WAL files that do NOT have a startime in their name.
 Use the 'generic' WAL-filename parser instead of the one that
 used be local here. Implement support for 'startTime' filter.
 Previous was just not implemented.

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
 Checkstyle.

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
 Use the new general WAL name timestamp parser.

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
 Utility for parsing timestamp from WAL filename.

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
 Export attributes about the local recovered.edits test file
 so other tests can play with it.

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
Michael Stack 2020-10-09 08:46:05 -07:00 committed by GitHub
parent 78ae1f176d
commit 665a8767a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 263 additions and 123 deletions

View File

@ -364,7 +364,8 @@ public final class CommonFSUtils {
if (!qualifiedWalDir.equals(rootDir)) { if (!qualifiedWalDir.equals(rootDir)) {
if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) { if (qualifiedWalDir.toString().startsWith(rootDir.toString() + "/")) {
throw new IllegalStateException("Illegal WAL directory specified. " + throw new IllegalStateException("Illegal WAL directory specified. " +
"WAL directories are not permitted to be under the root directory if set."); "WAL directories are not permitted to be under root directory: rootDir=" +
rootDir.toString() + ", qualifiedWALDir=" + qualifiedWalDir);
} }
} }
return true; return true;

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -22,24 +22,21 @@ import java.io.DataOutput;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputFormat;
@ -49,6 +46,9 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files. * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
@ -77,10 +77,6 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
* Represent an WALSplit, i.e. a single WAL file. * Represent an WALSplit, i.e. a single WAL file.
* Start- and EndTime are managed by the split, so that WAL files can be * Start- and EndTime are managed by the split, so that WAL files can be
* filtered before WALEdits are passed to the mapper(s). * filtered before WALEdits are passed to the mapper(s).
* @param logFileName
* @param fileSize
* @param startTime
* @param endTime
*/ */
public WALSplit(String logFileName, long fileSize, long startTime, long endTime) { public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
this.logFileName = logFileName; this.logFileName = logFileName;
@ -186,7 +182,9 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
@Override @Override
public boolean nextKeyValue() throws IOException, InterruptedException { public boolean nextKeyValue() throws IOException, InterruptedException {
if (reader == null) return false; if (reader == null) {
return false;
}
this.currentPos = reader.getPosition(); this.currentPos = reader.getPosition();
Entry temp; Entry temp;
long i = -1; long i = -1;
@ -204,7 +202,9 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
} while (temp != null && temp.getKey().getWriteTime() < startTime); } while (temp != null && temp.getKey().getWriteTime() < startTime);
if (temp == null) { if (temp == null) {
if (i > 0) LOG.info("Skipped " + i + " entries."); if (i > 0) {
LOG.info("Skipped " + i + " entries.");
}
LOG.info("Reached end of file."); LOG.info("Reached end of file.");
return false; return false;
} else if (i > 0) { } else if (i > 0) {
@ -242,7 +242,9 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
LOG.info("Closing reader"); LOG.info("Closing reader");
if (reader != null) this.reader.close(); if (reader != null) {
this.reader.close();
}
} }
} }
@ -301,40 +303,56 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
} }
/**
* @param startTime If file looks like it has a timestamp in its name, we'll check if newer
* or equal to this value else we will filter out the file. If name does not
* seem to have a timestamp, we will just return it w/o filtering.
* @param endTime If file looks like it has a timestamp in its name, we'll check if older or equal
* to this value else we will filter out the file. If name does not seem to
* have a timestamp, we will just return it w/o filtering.
*/
private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
throws IOException { throws IOException {
List<FileStatus> result = new ArrayList<>(); List<FileStatus> result = new ArrayList<>();
LOG.debug("Scanning " + dir.toString() + " for WAL files"); LOG.debug("Scanning " + dir.toString() + " for WAL files");
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir); RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
if (!iter.hasNext()) return Collections.emptyList(); if (!iter.hasNext()) {
return Collections.emptyList();
}
while (iter.hasNext()) { while (iter.hasNext()) {
LocatedFileStatus file = iter.next(); LocatedFileStatus file = iter.next();
if (file.isDirectory()) { if (file.isDirectory()) {
// recurse into sub directories // Recurse into sub directories
result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
} else { } else {
String name = file.getPath().toString(); addFile(result, file, startTime, endTime);
int idx = name.lastIndexOf('.');
if (idx > 0) {
try {
long fileStartTime = Long.parseLong(name.substring(idx+1));
if (fileStartTime <= endTime) {
LOG.info("Found: " + file);
result.add(file);
}
} catch (NumberFormatException x) {
idx = 0;
}
}
if (idx == 0) {
LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
}
} }
} }
// TODO: These results should be sorted? Results could be content of recovered.edits directory
// -- null padded increasing numeric -- or a WAL file w/ timestamp suffix or timestamp and
// then meta suffix. See AbstractFSWALProvider#WALStartTimeComparator
return result; return result;
} }
static void addFile(List<FileStatus> result, LocatedFileStatus lfs, long startTime,
long endTime) {
long timestamp = WAL.getTimestamp(lfs.getPath().getName());
if (timestamp > 0) {
// Looks like a valid timestamp.
if (timestamp <= endTime && timestamp >= startTime) {
LOG.info("Found {}", lfs.getPath());
result.add(lfs);
} else {
LOG.info("Skipped {}, outside range [{}/{} - {}/{}]", lfs.getPath(),
startTime, Instant.ofEpochMilli(startTime), endTime, Instant.ofEpochMilli(endTime));
}
} else {
// If no timestamp, add it regardless.
LOG.info("Found (no-timestamp!) {}", lfs);
result.add(lfs);
}
}
@Override @Override
public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split, public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException { TaskAttemptContext context) throws IOException, InterruptedException {

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -239,6 +238,7 @@ public class WALPlayer extends Configured implements Tool {
super.cleanup(context); super.cleanup(context);
} }
@SuppressWarnings("checkstyle:EmptyBlock")
@Override @Override
public void setup(Context context) throws IOException { public void setup(Context context) throws IOException {
String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
@ -377,17 +377,21 @@ public class WALPlayer extends Configured implements Tool {
System.err.println(" <WAL inputdir> directory of WALs to replay."); System.err.println(" <WAL inputdir> directory of WALs to replay.");
System.err.println(" <tables> comma separated list of tables. If no tables specified,"); System.err.println(" <tables> comma separated list of tables. If no tables specified,");
System.err.println(" all are imported (even hbase:meta if present)."); System.err.println(" all are imported (even hbase:meta if present).");
System.err.println(" <tableMappings> WAL entries can be mapped to a new set of tables by passing"); System.err.println(" <tableMappings> WAL entries can be mapped to a new set of tables by " +
System.err.println(" <tableMappings>, a comma separated list of target tables."); "passing");
System.err.println(" If specified, each table in <tables> must have a mapping."); System.err.println(" <tableMappings>, a comma separated list of target " +
"tables.");
System.err.println(" If specified, each table in <tables> must have a " +
"mapping.");
System.err.println("To generate HFiles to bulk load instead of loading HBase directly, pass:"); System.err.println("To generate HFiles to bulk load instead of loading HBase directly, pass:");
System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
System.err.println(" Only one table can be specified, and no mapping allowed!"); System.err.println(" Only one table can be specified, and no mapping allowed!");
System.err.println("To specify a time range, pass:"); System.err.println("To specify a time range, pass:");
System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]"); System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]"); System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
System.err.println(" The start and the end date of timerange. The dates can be expressed"); System.err.println(" The start and the end date of timerange (inclusive). The dates can be");
System.err.println(" in milliseconds since epoch or in yyyy-MM-dd'T'HH:mm:ss.SS format."); System.err.println(" expressed in milliseconds-since-epoch or yyyy-MM-dd'T'HH:mm:ss.SS " +
"format.");
System.err.println(" E.g. 1234567890120 or 2009-02-13T23:32:30.12"); System.err.println(" E.g. 1234567890120 or 2009-02-13T23:32:30.12");
System.err.println("Other options:"); System.err.println("Other options:");
System.err.println(" -D" + JOB_NAME_CONF_KEY + "=jobName"); System.err.println(" -D" + JOB_NAME_CONF_KEY + "=jobName");

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@Category({ MapReduceTests.class, SmallTests.class})
public class TestWALInputFormat {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALInputFormat.class);
/**
* Test the primitive start/end time filtering.
*/
@Test
public void testAddFile() {
List<FileStatus> lfss = new ArrayList<>();
LocatedFileStatus lfs = Mockito.mock(LocatedFileStatus.class);
long now = System.currentTimeMillis();
Mockito.when(lfs.getPath()).thenReturn(new Path("/name." + now));
WALInputFormat.addFile(lfss, lfs, now, now);
assertEquals(1, lfss.size());
WALInputFormat.addFile(lfss, lfs, now - 1, now - 1);
assertEquals(1, lfss.size());
WALInputFormat.addFile(lfss, lfs, now - 2, now - 1);
assertEquals(1, lfss.size());
WALInputFormat.addFile(lfss, lfs, now - 2, now);
assertEquals(2, lfss.size());
WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, now);
assertEquals(3, lfss.size());
WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE);
assertEquals(4, lfss.size());
WALInputFormat.addFile(lfss, lfs, now, now + 2);
assertEquals(5, lfss.size());
WALInputFormat.addFile(lfss, lfs, now + 1, now + 2);
assertEquals(5, lfss.size());
Mockito.when(lfs.getPath()).thenReturn(new Path("/name"));
WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE);
assertEquals(6, lfss.size());
Mockito.when(lfs.getPath()).thenReturn(new Path("/name.123"));
WALInputFormat.addFile(lfss, lfs, Long.MIN_VALUE, Long.MAX_VALUE);
assertEquals(7, lfss.size());
Mockito.when(lfs.getPath()).thenReturn(new Path("/name." + now + ".meta"));
WALInputFormat.addFile(lfss, lfs, now, now);
assertEquals(8, lfss.size());
}
}

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -24,8 +24,8 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.ArrayList; import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -73,7 +74,6 @@ import org.mockito.stubbing.Answer;
*/ */
@Category({MapReduceTests.class, LargeTests.class}) @Category({MapReduceTests.class, LargeTests.class})
public class TestWALPlayer { public class TestWALPlayer {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALPlayer.class); HBaseClassTestRule.forClass(TestWALPlayer.class);
@ -91,7 +91,7 @@ public class TestWALPlayer {
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
conf= TEST_UTIL.getConfiguration(); conf = TEST_UTIL.getConfiguration();
rootDir = TEST_UTIL.createRootDir(); rootDir = TEST_UTIL.createRootDir();
walRootDir = TEST_UTIL.createWALRootDir(); walRootDir = TEST_UTIL.createWALRootDir();
fs = CommonFSUtils.getRootDirFileSystem(conf); fs = CommonFSUtils.getRootDirFileSystem(conf);
@ -106,9 +106,32 @@ public class TestWALPlayer {
logFs.delete(walRootDir, true); logFs.delete(walRootDir, true);
} }
/**
* Test that WALPlayer can replay recovered.edits files.
*/
@Test
public void testPlayingRecoveredEdit() throws Exception {
TableName tn = TableName.valueOf(TestRecoveredEdits.RECOVEREDEDITS_TABLENAME);
TEST_UTIL.createTable(tn, TestRecoveredEdits.RECOVEREDEDITS_COLUMNFAMILY);
// Copy testing recovered.edits file that is over under hbase-server test resources
// up into a dir in our little hdfs cluster here.
String hbaseServerTestResourcesEdits = System.getProperty("test.build.classes") +
"/../../../hbase-server/src/test/resources/" +
TestRecoveredEdits.RECOVEREDEDITS_PATH.getName();
assertTrue(new File(hbaseServerTestResourcesEdits).exists());
FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
// Target dir.
Path targetDir = new Path("edits").makeQualified(dfs.getUri(), dfs.getHomeDirectory());
assertTrue(dfs.mkdirs(targetDir));
dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir);
assertEquals(0,
ToolRunner.run(new WALPlayer(this.conf), new String [] {targetDir.toString()}));
// I don't know how many edits are in this file for this table... so just check more than 1.
assertTrue(TEST_UTIL.countRows(tn) > 0);
}
/** /**
* Simple end-to-end test * Simple end-to-end test
* @throws Exception
*/ */
@Test @Test
public void testWALPlayer() throws Exception { public void testWALPlayer() throws Exception {

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.List; import java.util.List;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap; import java.util.TreeMap;
@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
@ -123,8 +123,7 @@ public class TestWALRecordReader {
} }
/** /**
* Test partial reads from the log based on passed time range * Test partial reads from the WALs based on passed time range.
* @throws Exception
*/ */
@Test @Test
public void testPartialRead() throws Exception { public void testPartialRead() throws Exception {
@ -140,6 +139,7 @@ public class TestWALRecordReader {
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
log.appendData(info, getWalKeyImpl(ts+1, scopes), edit); log.appendData(info, getWalKeyImpl(ts+1, scopes), edit);
log.sync(); log.sync();
Threads.sleep(10);
LOG.info("Before 1st WAL roll " + log.toString()); LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter(); log.rollWriter();
LOG.info("Past 1st WAL roll " + log.toString()); LOG.info("Past 1st WAL roll " + log.toString());
@ -164,26 +164,29 @@ public class TestWALRecordReader {
jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
jobConf.setLong(WALInputFormat.END_TIME_KEY, ts); jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
// only 1st file is considered, and only its 1st entry is used // Only 1st file is considered, and only its 1st entry is in-range.
List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
assertEquals(1, splits.size()); assertEquals(1, splits.size());
testSplit(splits.get(0), Bytes.toBytes("1")); testSplit(splits.get(0), Bytes.toBytes("1"));
jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1);
jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1); jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
// both files need to be considered
assertEquals(2, splits.size()); assertEquals(2, splits.size());
// only the 2nd entry from the 1st file is used // Both entries from first file are in-range.
testSplit(splits.get(0), Bytes.toBytes("2")); testSplit(splits.get(0), Bytes.toBytes("1"), Bytes.toBytes("2"));
// only the 1nd entry from the 2nd file is used // Only the 1st entry from the 2nd file is in-range.
testSplit(splits.get(1), Bytes.toBytes("3")); testSplit(splits.get(1), Bytes.toBytes("3"));
jobConf.setLong(WALInputFormat.START_TIME_KEY, ts + 1);
jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1 + 1);
splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
assertEquals(1, splits.size());
// Only the 1st entry from the 2nd file is in-range.
testSplit(splits.get(0), Bytes.toBytes("3"));
} }
/** /**
* Test basic functionality * Test basic functionality
* @throws Exception
*/ */
@Test @Test
public void testWALRecordReader() throws Exception { public void testWALRecordReader() throws Exception {
@ -234,11 +237,7 @@ public class TestWALRecordReader {
jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE); jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs); jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
// both logs need to be considered assertTrue(splits.isEmpty());
assertEquals(2, splits.size());
// but both readers skip all edits
testSplit(splits.get(0));
testSplit(splits.get(1));
} }
/** /**
@ -346,4 +345,4 @@ public class TestWALRecordReader {
} }
reader.close(); reader.close();
} }
} }

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import static org.apache.commons.lang3.StringUtils.isNumeric;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -439,17 +438,12 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* @return start time * @return start time
*/ */
private static long getTS(Path p) { private static long getTS(Path p) {
String name = p.getName(); return WAL.getTimestamp(p.getName());
String [] splits = name.split("\\.");
String ts = splits[splits.length - 1];
if (!isNumeric(ts)) {
// Its a '.meta' or a '.syncrep' suffix.
ts = splits[splits.length - 2];
}
return Long.parseLong(ts);
} }
} }
public static boolean isArchivedLogFile(Path p) { public static boolean isArchivedLogFile(Path p) {
String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR; String oldLog = Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME + Path.SEPARATOR;
return p.toString().contains(oldLog); return p.toString().contains(oldLog);

View File

@ -32,6 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import static org.apache.commons.lang3.StringUtils.isNumeric;
/** /**
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
@ -299,4 +300,32 @@ public interface WAL extends Closeable, WALFileLengthProvider {
return this.key + "=" + this.edit; return this.key + "=" + this.edit;
} }
} }
/**
* Split a WAL filename to get a start time. WALs usually have the time we start writing to them
* as part of their name, usually the suffix. Sometimes there will be an extra suffix as when it
* is a WAL for the meta table. For example, WALs might look like this
* <code>10.20.20.171%3A60020.1277499063250</code> where <code>1277499063250</code> is the
* timestamp. Could also be a meta WAL which adds a '.meta' suffix or a
* synchronous replication WAL which adds a '.syncrep' suffix. Check for these. File also may have
* no timestamp on it. For example the recovered.edits files are WALs but are named in ascending
* order. Here is an example: 0000000000000016310. Allow for this.
* @param name Name of the WAL file.
* @return Timestamp or -1.
*/
public static long getTimestamp(String name) {
String [] splits = name.split("\\.");
if (splits.length <= 1) {
return -1;
}
String timestamp = splits[splits.length - 1];
if (!isNumeric(timestamp)) {
// Its a '.meta' or a '.syncrep' suffix.
timestamp = splits[splits.length - 2];
if (!isNumeric(timestamp)) {
return -1;
}
}
return Long.parseLong(timestamp);
}
} }

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@ -78,6 +78,32 @@ public class TestRecoveredEdits {
@Rule public TestName testName = new TestName(); @Rule public TestName testName = new TestName();
/**
* Path to a recovered.edits file in hbase-server test resources folder.
* This is a little fragile getting this path to a file of 10M of edits.
*/
@SuppressWarnings("checkstyle:VisibilityModifier")
public static final Path RECOVEREDEDITS_PATH = new Path(
System.getProperty("test.build.classes", "target/test-classes"),
"0000000000000016310");
/**
* Name of table referenced by edits in the recovered.edits file.
*/
public static final String RECOVEREDEDITS_TABLENAME = "IntegrationTestBigLinkedList";
/**
* Column family referenced by edits in the recovered.edits file.
*/
public static final byte [] RECOVEREDEDITS_COLUMNFAMILY = Bytes.toBytes("meta");
public static final byte[][] RECOVEREDITS_COLUMNFAMILY_ARRAY =
new byte[][] {RECOVEREDEDITS_COLUMNFAMILY};
public static final ColumnFamilyDescriptor RECOVEREDEDITS_CFD =
ColumnFamilyDescriptorBuilder.newBuilder(RECOVEREDEDITS_COLUMNFAMILY).build();
/**
* Name of table mentioned edits from recovered.edits
*/
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
blockCache = BlockCacheFactory.createBlockCache(TEST_UTIL.getConfiguration()); blockCache = BlockCacheFactory.createBlockCache(TEST_UTIL.getConfiguration());
@ -102,13 +128,9 @@ public class TestRecoveredEdits {
// Set it so we flush every 1M or so. Thats a lot. // Set it so we flush every 1M or so. Thats a lot.
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024); conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy).toLowerCase()); conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(policy).toLowerCase());
// The file of recovered edits has a column family of 'meta'. TableDescriptor tableDescriptor = TableDescriptorBuilder.
final String columnFamily = "meta"; newBuilder(TableName.valueOf(testName.getMethodName())).
byte[][] columnFamilyAsByteArray = new byte[][] { Bytes.toBytes(columnFamily) }; setColumnFamily(RECOVEREDEDITS_CFD) .build();
TableDescriptor tableDescriptor = TableDescriptorBuilder
.newBuilder(TableName.valueOf(testName.getMethodName())).setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)).build())
.build();
RegionInfo hri = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); RegionInfo hri = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
final String encodedRegionName = hri.getEncodedName(); final String encodedRegionName = hri.getEncodedName();
Path hbaseRootDir = TEST_UTIL.getDataTestDir(); Path hbaseRootDir = TEST_UTIL.getDataTestDir();
@ -123,24 +145,20 @@ public class TestRecoveredEdits {
HRegion region = HBaseTestingUtility HRegion region = HBaseTestingUtility
.createRegionAndWAL(hri, hbaseRootDir, conf, tableDescriptor, blockCache); .createRegionAndWAL(hri, hbaseRootDir, conf, tableDescriptor, blockCache);
assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName()); assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName());
List<String> storeFiles = region.getStoreFileList(columnFamilyAsByteArray); List<String> storeFiles = region.getStoreFileList(RECOVEREDITS_COLUMNFAMILY_ARRAY);
// There should be no store files. // There should be no store files.
assertTrue(storeFiles.isEmpty()); assertTrue(storeFiles.isEmpty());
region.close(); region.close();
Path regionDir = FSUtils.getRegionDirFromRootDir(hbaseRootDir, hri); Path regionDir = FSUtils.getRegionDirFromRootDir(hbaseRootDir, hri);
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir); Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir);
// This is a little fragile getting this path to a file of 10M of edits.
Path recoveredEditsFile = new Path(
System.getProperty("test.build.classes", "target/test-classes"),
"0000000000000016310");
// Copy this file under the region's recovered.edits dir so it is replayed on reopen. // Copy this file under the region's recovered.edits dir so it is replayed on reopen.
Path destination = new Path(recoveredEditsDir, recoveredEditsFile.getName()); Path destination = new Path(recoveredEditsDir, RECOVEREDEDITS_PATH.getName());
fs.copyToLocalFile(recoveredEditsFile, destination); fs.copyToLocalFile(RECOVEREDEDITS_PATH, destination);
assertTrue(fs.exists(destination)); assertTrue(fs.exists(destination));
// Now the file 0000000000000016310 is under recovered.edits, reopen the region to replay. // Now the file 0000000000000016310 is under recovered.edits, reopen the region to replay.
region = HRegion.openHRegion(region, null); region = HRegion.openHRegion(region, null);
assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName()); assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName());
storeFiles = region.getStoreFileList(columnFamilyAsByteArray); storeFiles = region.getStoreFileList(RECOVEREDITS_COLUMNFAMILY_ARRAY);
// Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if // Our 0000000000000016310 is 10MB. Most of the edits are for one region. Lets assume that if
// we flush at 1MB, that there are at least 3 flushed files that are there because of the // we flush at 1MB, that there are at least 3 flushed files that are there because of the
// replay of edits. // replay of edits.
@ -150,19 +168,16 @@ public class TestRecoveredEdits {
assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10); assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10);
} }
// Now verify all edits made it into the region. // Now verify all edits made it into the region.
int count = verifyAllEditsMadeItIn(fs, conf, recoveredEditsFile, region); int count = verifyAllEditsMadeItIn(fs, conf, RECOVEREDEDITS_PATH, region);
assertTrue(count > 0);
LOG.info("Checked " + count + " edits made it in"); LOG.info("Checked " + count + " edits made it in");
} }
/** /**
* @param fs
* @param conf
* @param edits
* @param region
* @return Return how many edits seen. * @return Return how many edits seen.
* @throws IOException
*/ */
private int verifyAllEditsMadeItIn(final FileSystem fs, final Configuration conf, // Used by TestWALPlayer over in hbase-mapreduce too.
public static int verifyAllEditsMadeItIn(final FileSystem fs, final Configuration conf,
final Path edits, final HRegion region) throws IOException { final Path edits, final HRegion region) throws IOException {
int count = 0; int count = 0;
// Read all cells from recover edits // Read all cells from recover edits

View File

@ -424,32 +424,11 @@ See <<hfile_tool>>.
For bulk replaying WAL files or _recovered.edits_ files, see For bulk replaying WAL files or _recovered.edits_ files, see
<<walplayer>>. For reading/verifying individual files, read on. <<walplayer>>. For reading/verifying individual files, read on.
[[hlog_tool]]
==== FSHLog tool
The main method on `FSHLog` offers manual split and dump facilities.
Pass it WALs or the product of a split, the content of the _recovered.edits_.
directory.
You can get a textual dump of a WAL file content by doing the following:
----
$ ./bin/hbase org.apache.hadoop.hbase.regionserver.wal.FSHLog --dump hdfs://example.org:8020/hbase/WALs/example.org,60020,1283516293161/10.10.21.10%3A60020.1283973724012
----
The return code will be non-zero if there are any issues with the file so you can test wholesomeness of file by redirecting `STDOUT` to `/dev/null` and testing the program return.
Similarly you can force a split of a log file directory by doing:
----
$ ./bin/hbase org.apache.hadoop.hbase.regionserver.wal.FSHLog --split hdfs://example.org:8020/hbase/WALs/example.org,60020,1283516293161/
----
[[hlog_tool.prettyprint]] [[hlog_tool.prettyprint]]
===== WALPrettyPrinter ==== WALPrettyPrinter
The `WALPrettyPrinter` is a tool with configurable options to print the contents of a WAL. The `WALPrettyPrinter` is a tool with configurable options to print the contents of a WAL
You can invoke it via the HBase cli with the 'wal' command. or a _recovered.edits_ file. You can invoke it via the HBase cli with the 'wal' command.
---- ----
$ ./bin/hbase wal hdfs://example.org:8020/hbase/WALs/example.org,60020,1283516293161/10.10.21.10%3A60020.1283973724012 $ ./bin/hbase wal hdfs://example.org:8020/hbase/WALs/example.org,60020,1283516293161/10.10.21.10%3A60020.1283973724012
@ -904,7 +883,10 @@ The output can optionally be mapped to another set of tables.
WALPlayer can also generate HFiles for later bulk importing, in that case only a single table and no mapping can be specified. WALPlayer can also generate HFiles for later bulk importing, in that case only a single table and no mapping can be specified.
.WALPrettyPrinter/FSHLog Tool Finally, you can use WALPlayer to replay the content of a Regions `recovered.edits` directory (the files under
`recovered.edits` directory have the same format as WAL files).
.WALPrettyPrinter
[NOTE] [NOTE]
==== ====
To read or verify single WAL files or _recovered.edits_ files, since they share the WAL format, To read or verify single WAL files or _recovered.edits_ files, since they share the WAL format,
@ -945,8 +927,8 @@ To generate HFiles to bulk load instead of loading HBase directly, pass:
To specify a time range, pass: To specify a time range, pass:
-Dwal.start.time=[date|ms] -Dwal.start.time=[date|ms]
-Dwal.end.time=[date|ms] -Dwal.end.time=[date|ms]
The start and the end date of timerange. The dates can be expressed The start and the end date of timerange (inclusive). The dates can be
in milliseconds since epoch or in yyyy-MM-dd'T'HH:mm:ss.SS format. expressed in milliseconds-since-epoch or yyyy-MM-dd'T'HH:mm:ss.SS format.
E.g. 1234567890120 or 2009-02-13T23:32:30.12 E.g. 1234567890120 or 2009-02-13T23:32:30.12
Other options: Other options:
-Dmapreduce.job.name=jobName -Dmapreduce.job.name=jobName