HBASE-13895 DATALOSS: Region assigned before WAL replay when abort (Enis Soztutar) -- REAPPLY
This commit is contained in:
parent
f0e29c49a1
commit
20e855f282
|
@ -0,0 +1,34 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown by the region server when it is aborting.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class RegionServerAbortedException extends RegionServerStoppedException {
|
||||||
|
public RegionServerAbortedException(String s) {
|
||||||
|
super(s);
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thrown by the region server when it is in shutting down state.
|
* Thrown by the region server when it is in shutting down state.
|
||||||
|
* @see RegionServerAbortedException
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
|
|
|
@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -31,8 +30,11 @@ import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
@ -67,6 +69,19 @@ import org.apache.hadoop.mapreduce.Mapper;
|
||||||
import org.apache.hadoop.mapreduce.Reducer;
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
|
||||||
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.util.SortedSet;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@ -91,6 +106,9 @@ import com.google.common.collect.Sets;
|
||||||
*/
|
*/
|
||||||
@Category(IntegrationTests.class)
|
@Category(IntegrationTests.class)
|
||||||
public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
|
public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(IntegrationTestLoadAndVerify.class);
|
||||||
|
|
||||||
private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
|
private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
|
||||||
private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
|
private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
|
||||||
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
|
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
|
||||||
|
@ -112,7 +130,10 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
|
||||||
|
|
||||||
private static final int SCANNER_CACHING = 500;
|
private static final int SCANNER_CACHING = 500;
|
||||||
|
|
||||||
|
private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
|
||||||
|
|
||||||
private String toRun = null;
|
private String toRun = null;
|
||||||
|
private String keysDir = null;
|
||||||
|
|
||||||
private enum Counters {
|
private enum Counters {
|
||||||
ROWS_WRITTEN,
|
ROWS_WRITTEN,
|
||||||
|
@ -267,7 +288,6 @@ public void cleanUpCluster() throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
|
public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
|
||||||
private static final Log LOG = LogFactory.getLog(VerifyReducer.class);
|
|
||||||
private Counter refsChecked;
|
private Counter refsChecked;
|
||||||
private Counter rowsWritten;
|
private Counter rowsWritten;
|
||||||
|
|
||||||
|
@ -312,6 +332,7 @@ public void cleanUpCluster() throws Exception {
|
||||||
|
|
||||||
protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception {
|
protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception {
|
||||||
Path outputDir = getTestDir(TEST_NAME, "load-output");
|
Path outputDir = getTestDir(TEST_NAME, "load-output");
|
||||||
|
LOG.info("Load output dir: " + outputDir);
|
||||||
|
|
||||||
NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
|
NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
|
||||||
conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
|
conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
|
||||||
|
@ -339,6 +360,7 @@ public void cleanUpCluster() throws Exception {
|
||||||
|
|
||||||
protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
|
protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
|
||||||
Path outputDir = getTestDir(TEST_NAME, "verify-output");
|
Path outputDir = getTestDir(TEST_NAME, "verify-output");
|
||||||
|
LOG.info("Verify output dir: " + outputDir);
|
||||||
|
|
||||||
Job job = Job.getInstance(conf);
|
Job job = Job.getInstance(conf);
|
||||||
job.setJarByClass(this.getClass());
|
job.setJarByClass(this.getClass());
|
||||||
|
@ -363,6 +385,139 @@ public void cleanUpCluster() throws Exception {
|
||||||
assertEquals(0, numOutputRecords);
|
assertEquals(0, numOutputRecords);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tool to search missing rows in WALs and hfiles.
|
||||||
|
* Pass in file or dir of keys to search for. Key file must have been written by Verify step
|
||||||
|
* (we depend on the format it writes out. We'll read them in and then search in hbase
|
||||||
|
* WALs and oldWALs dirs (Some of this is TODO).
|
||||||
|
*/
|
||||||
|
public static class WALSearcher extends WALPlayer {
|
||||||
|
public WALSearcher(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The actual searcher mapper.
|
||||||
|
*/
|
||||||
|
public static class WALMapperSearcher extends WALMapper {
|
||||||
|
private SortedSet<byte []> keysToFind;
|
||||||
|
private AtomicInteger rows = new AtomicInteger(0);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
|
||||||
|
throws IOException {
|
||||||
|
super.setup(context);
|
||||||
|
try {
|
||||||
|
this.keysToFind = readKeysToSearch(context.getConfiguration());
|
||||||
|
LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new InterruptedIOException(e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean filter(Context context, Cell cell) {
|
||||||
|
// TODO: Can I do a better compare than this copying out key?
|
||||||
|
byte [] row = new byte [cell.getRowLength()];
|
||||||
|
System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
|
||||||
|
boolean b = this.keysToFind.contains(row);
|
||||||
|
if (b) {
|
||||||
|
String keyStr = Bytes.toStringBinary(row);
|
||||||
|
try {
|
||||||
|
LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
|
||||||
|
} catch (IOException|InterruptedException e) {
|
||||||
|
LOG.warn(e);
|
||||||
|
}
|
||||||
|
if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
|
||||||
|
context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
|
||||||
|
}
|
||||||
|
context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
|
||||||
|
}
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put in place the above WALMapperSearcher.
|
||||||
|
@Override
|
||||||
|
public Job createSubmittableJob(String[] args) throws IOException {
|
||||||
|
Job job = super.createSubmittableJob(args);
|
||||||
|
// Call my class instead.
|
||||||
|
job.setJarByClass(WALMapperSearcher.class);
|
||||||
|
job.setMapperClass(WALMapperSearcher.class);
|
||||||
|
job.setOutputFormatClass(NullOutputFormat.class);
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static final String FOUND_GROUP_KEY = "Found";
|
||||||
|
static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
|
||||||
|
|
||||||
|
static SortedSet<byte []> readKeysToSearch(final Configuration conf)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
|
||||||
|
if (!fs.exists(keysInputDir)) {
|
||||||
|
throw new FileNotFoundException(keysInputDir.toString());
|
||||||
|
}
|
||||||
|
if (!fs.isDirectory(keysInputDir)) {
|
||||||
|
FileStatus keyFileStatus = fs.getFileStatus(keysInputDir);
|
||||||
|
readFileToSearch(conf, fs, keyFileStatus, result);
|
||||||
|
} else {
|
||||||
|
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
|
||||||
|
while(iterator.hasNext()) {
|
||||||
|
LocatedFileStatus keyFileStatus = iterator.next();
|
||||||
|
// Skip "_SUCCESS" file.
|
||||||
|
if (keyFileStatus.getPath().getName().startsWith("_")) continue;
|
||||||
|
readFileToSearch(conf, fs, keyFileStatus, result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
|
||||||
|
final FileSystem fs, final FileStatus keyFileStatus, SortedSet<byte []> result)
|
||||||
|
throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
// verify uses file output format and writes <Text, Text>. We can read it as a text file
|
||||||
|
try (InputStream in = fs.open(keyFileStatus.getPath());
|
||||||
|
BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
|
||||||
|
// extract out the key and return that missing as a missing key
|
||||||
|
String line;
|
||||||
|
while ((line = reader.readLine()) != null) {
|
||||||
|
if (line.isEmpty()) continue;
|
||||||
|
|
||||||
|
String[] parts = line.split("\\s+");
|
||||||
|
if (parts.length >= 1) {
|
||||||
|
String key = parts[0];
|
||||||
|
result.add(Bytes.toBytesBinary(key));
|
||||||
|
} else {
|
||||||
|
LOG.info("Cannot parse key from: " + line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int doSearch(Configuration conf, String keysDir) throws Exception {
|
||||||
|
Path inputDir = new Path(keysDir);
|
||||||
|
|
||||||
|
getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
|
||||||
|
SortedSet<byte []> keys = readKeysToSearch(getConf());
|
||||||
|
if (keys.isEmpty()) throw new RuntimeException("No keys to find");
|
||||||
|
LOG.info("Count of keys to find: " + keys.size());
|
||||||
|
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
|
||||||
|
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
|
||||||
|
// Now read all WALs. In two dirs. Presumes certain layout.
|
||||||
|
Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
|
||||||
|
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
|
LOG.info("Running Search with keys inputDir=" + inputDir +
|
||||||
|
" against " + getConf().get(HConstants.HBASE_DIR));
|
||||||
|
int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
|
||||||
|
if (ret != 0) return ret;
|
||||||
|
return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
|
||||||
|
}
|
||||||
|
|
||||||
private static void setJobScannerConf(Job job) {
|
private static void setJobScannerConf(Job job) {
|
||||||
// Make sure scanners log something useful to make debugging possible.
|
// Make sure scanners log something useful to make debugging possible.
|
||||||
job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
|
job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
|
||||||
|
@ -371,11 +526,8 @@ public void cleanUpCluster() throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Path getTestDir(String testName, String subdir) throws IOException {
|
public Path getTestDir(String testName, String subdir) throws IOException {
|
||||||
//HBaseTestingUtility.getDataTestDirOnTestFs() has not been backported.
|
Path testDir = util.getDataTestDirOnTestFS(testName);
|
||||||
FileSystem fs = FileSystem.get(getConf());
|
FileSystem fs = FileSystem.get(getConf());
|
||||||
Path base = new Path(fs.getWorkingDirectory(), "test-data");
|
|
||||||
String randomStr = UUID.randomUUID().toString();
|
|
||||||
Path testDir = new Path(base, randomStr);
|
|
||||||
fs.deleteOnExit(testDir);
|
fs.deleteOnExit(testDir);
|
||||||
|
|
||||||
return new Path(new Path(testDir, testName), subdir);
|
return new Path(new Path(testDir, testName), subdir);
|
||||||
|
@ -398,7 +550,8 @@ public void cleanUpCluster() throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void usage() {
|
public void usage() {
|
||||||
System.err.println(this.getClass().getSimpleName() + " [-Doptions] <load|verify|loadAndVerify>");
|
System.err.println(this.getClass().getSimpleName()
|
||||||
|
+ " [-Doptions] <load|verify|loadAndVerify|search>");
|
||||||
System.err.println(" Loads a table with row dependencies and verifies the dependency chains");
|
System.err.println(" Loads a table with row dependencies and verifies the dependency chains");
|
||||||
System.err.println("Options");
|
System.err.println("Options");
|
||||||
System.err.println(" -Dloadmapper.table=<name> Table to write/verify (default autogen)");
|
System.err.println(" -Dloadmapper.table=<name> Table to write/verify (default autogen)");
|
||||||
|
@ -417,11 +570,16 @@ public void cleanUpCluster() throws Exception {
|
||||||
super.processOptions(cmd);
|
super.processOptions(cmd);
|
||||||
|
|
||||||
String[] args = cmd.getArgs();
|
String[] args = cmd.getArgs();
|
||||||
if (args == null || args.length < 1 || args.length > 1) {
|
if (args == null || args.length < 1) {
|
||||||
usage();
|
usage();
|
||||||
throw new RuntimeException("Incorrect Number of args.");
|
throw new RuntimeException("Incorrect Number of args.");
|
||||||
}
|
}
|
||||||
toRun = args[0];
|
toRun = args[0];
|
||||||
|
if (toRun.equalsIgnoreCase("search")) {
|
||||||
|
if (args.length > 1) {
|
||||||
|
keysDir = args[1];
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -429,16 +587,25 @@ public void cleanUpCluster() throws Exception {
|
||||||
IntegrationTestingUtility.setUseDistributedCluster(getConf());
|
IntegrationTestingUtility.setUseDistributedCluster(getConf());
|
||||||
boolean doLoad = false;
|
boolean doLoad = false;
|
||||||
boolean doVerify = false;
|
boolean doVerify = false;
|
||||||
|
boolean doSearch = false;
|
||||||
boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true);
|
boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true);
|
||||||
int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
|
int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
|
||||||
|
|
||||||
if (toRun.equals("load")) {
|
if (toRun.equalsIgnoreCase("load")) {
|
||||||
doLoad = true;
|
doLoad = true;
|
||||||
} else if (toRun.equals("verify")) {
|
} else if (toRun.equalsIgnoreCase("verify")) {
|
||||||
doVerify= true;
|
doVerify= true;
|
||||||
} else if (toRun.equals("loadAndVerify")) {
|
} else if (toRun.equalsIgnoreCase("loadAndVerify")) {
|
||||||
doLoad=true;
|
doLoad=true;
|
||||||
doVerify= true;
|
doVerify= true;
|
||||||
|
} else if (toRun.equalsIgnoreCase("search")) {
|
||||||
|
doLoad=false;
|
||||||
|
doVerify= false;
|
||||||
|
doSearch = true;
|
||||||
|
if (keysDir == null) {
|
||||||
|
System.err.println("Usage: search <KEYS_DIR>]");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
System.err.println("Invalid argument " + toRun);
|
System.err.println("Invalid argument " + toRun);
|
||||||
usage();
|
usage();
|
||||||
|
@ -450,9 +617,9 @@ public void cleanUpCluster() throws Exception {
|
||||||
HTableDescriptor htd = new HTableDescriptor(table);
|
HTableDescriptor htd = new HTableDescriptor(table);
|
||||||
htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
|
htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
|
||||||
|
|
||||||
|
if (doLoad) {
|
||||||
try (Connection conn = ConnectionFactory.createConnection(getConf());
|
try (Connection conn = ConnectionFactory.createConnection(getConf());
|
||||||
Admin admin = conn.getAdmin()) {
|
Admin admin = conn.getAdmin()) {
|
||||||
if (doLoad) {
|
|
||||||
admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
|
admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
|
||||||
doLoad(getConf(), htd);
|
doLoad(getConf(), htd);
|
||||||
}
|
}
|
||||||
|
@ -463,6 +630,9 @@ public void cleanUpCluster() throws Exception {
|
||||||
getTestingUtil(getConf()).deleteTable(htd.getTableName());
|
getTestingUtil(getConf()).deleteTable(htd.getTableName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (doSearch) {
|
||||||
|
return doSearch(getConf(), keysDir);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -85,6 +85,10 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
|
|
||||||
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
||||||
|
|
||||||
|
protected WALPlayer(final Configuration c) {
|
||||||
|
super(c);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mapper that just writes out KeyValues.
|
* A mapper that just writes out KeyValues.
|
||||||
* This one can be used together with {@link KeyValueSortReducer}
|
* This one can be used together with {@link KeyValueSortReducer}
|
||||||
|
@ -327,7 +331,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
* @throws Exception When running the job fails.
|
* @throws Exception When running the job fails.
|
||||||
*/
|
*/
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
int ret = ToolRunner.run(HBaseConfiguration.create(), new WALPlayer(), args);
|
int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
|
||||||
System.exit(ret);
|
System.exit(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||||
import org.apache.hadoop.hbase.quotas.RegionStateListener;
|
import org.apache.hadoop.hbase.quotas.RegionStateListener;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
@ -890,10 +891,18 @@ public class AssignmentManager {
|
||||||
LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
|
LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
|
||||||
region.getRegionNameAsString());
|
region.getRegionNameAsString());
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
long sleepTime = 0;
|
||||||
|
Configuration conf = this.server.getConfiguration();
|
||||||
if (t instanceof RemoteException) {
|
if (t instanceof RemoteException) {
|
||||||
t = ((RemoteException)t).unwrapRemoteException();
|
t = ((RemoteException)t).unwrapRemoteException();
|
||||||
}
|
}
|
||||||
if (t instanceof NotServingRegionException
|
if (t instanceof RegionServerAbortedException) {
|
||||||
|
// RS is aborting, we cannot offline the region since the region may need to do WAL
|
||||||
|
// recovery. Until we see the RS expiration, we should retry.
|
||||||
|
sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
|
||||||
|
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
|
||||||
|
|
||||||
|
} else if (t instanceof NotServingRegionException
|
||||||
|| t instanceof RegionServerStoppedException
|
|| t instanceof RegionServerStoppedException
|
||||||
|| t instanceof ServerNotRunningYetException) {
|
|| t instanceof ServerNotRunningYetException) {
|
||||||
LOG.debug("Offline " + region.getRegionNameAsString()
|
LOG.debug("Offline " + region.getRegionNameAsString()
|
||||||
|
@ -903,24 +912,22 @@ public class AssignmentManager {
|
||||||
} else if (t instanceof FailedServerException && i < maximumAttempts) {
|
} else if (t instanceof FailedServerException && i < maximumAttempts) {
|
||||||
// In case the server is in the failed server list, no point to
|
// In case the server is in the failed server list, no point to
|
||||||
// retry too soon. Retry after the failed_server_expiry time
|
// retry too soon. Retry after the failed_server_expiry time
|
||||||
try {
|
sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
|
||||||
Configuration conf = this.server.getConfiguration();
|
|
||||||
long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
|
|
||||||
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
|
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(server + " is on failed server list; waiting "
|
LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t);
|
||||||
+ sleepTime + "ms", t);
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (sleepTime > 0) {
|
||||||
Thread.sleep(sleepTime);
|
Thread.sleep(sleepTime);
|
||||||
|
}
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
LOG.warn("Failed to unassign "
|
LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie);
|
||||||
+ region.getRegionNameAsString() + " since interrupted", ie);
|
|
||||||
regionStates.updateRegionState(region, State.FAILED_CLOSE);
|
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
regionStates.updateRegionState(region, State.FAILED_CLOSE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
LOG.info("Server " + server + " returned " + t + " for "
|
LOG.info("Server " + server + " returned " + t + " for "
|
||||||
+ region.getRegionNameAsString() + ", try=" + i
|
+ region.getRegionNameAsString() + ", try=" + i
|
||||||
+ " of " + this.maximumAttempts, t);
|
+ " of " + this.maximumAttempts, t);
|
||||||
|
|
|
@ -1036,9 +1036,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
protected void checkOpen() throws IOException {
|
protected void checkOpen() throws IOException {
|
||||||
if (regionServer.isStopped() || regionServer.isAborted()) {
|
if (regionServer.isAborted()) {
|
||||||
throw new RegionServerStoppedException("Server " + regionServer.serverName
|
throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
|
||||||
+ " not running" + (regionServer.isAborted() ? ", aborting" : ""));
|
}
|
||||||
|
if (regionServer.isStopped()) {
|
||||||
|
throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
|
||||||
}
|
}
|
||||||
if (!regionServer.fsOk) {
|
if (!regionServer.fsOk) {
|
||||||
throw new RegionServerStoppedException("File system not available");
|
throw new RegionServerStoppedException("File system not available");
|
||||||
|
|
|
@ -114,7 +114,7 @@ public class TestWALPlayer {
|
||||||
.getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
|
.getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
|
||||||
|
|
||||||
Configuration configuration= TEST_UTIL.getConfiguration();
|
Configuration configuration= TEST_UTIL.getConfiguration();
|
||||||
WALPlayer player = new WALPlayer();
|
WALPlayer player = new WALPlayer(configuration);
|
||||||
String optionName="_test_.name";
|
String optionName="_test_.name";
|
||||||
configuration.set(optionName, "1000");
|
configuration.set(optionName, "1000");
|
||||||
player.setupTime(configuration, optionName);
|
player.setupTime(configuration, optionName);
|
||||||
|
|
|
@ -982,12 +982,6 @@ public class TestAssignmentManagerOnCluster {
|
||||||
assertTrue(regionStates.isRegionOnline(hri));
|
assertTrue(regionStates.isRegionOnline(hri));
|
||||||
assertEquals(oldServerName, regionStates.getRegionServerOfRegion(hri));
|
assertEquals(oldServerName, regionStates.getRegionServerOfRegion(hri));
|
||||||
|
|
||||||
// Try to unassign the dead region before SSH
|
|
||||||
am.unassign(hri);
|
|
||||||
// The region should be moved to offline since the server is dead
|
|
||||||
RegionState state = regionStates.getRegionState(hri);
|
|
||||||
assertTrue(state.isOffline());
|
|
||||||
|
|
||||||
// Kill the hosting server, which doesn't have meta on it.
|
// Kill the hosting server, which doesn't have meta on it.
|
||||||
cluster.killRegionServer(oldServerName);
|
cluster.killRegionServer(oldServerName);
|
||||||
cluster.waitForRegionServerToStop(oldServerName, -1);
|
cluster.waitForRegionServerToStop(oldServerName, -1);
|
||||||
|
@ -1061,12 +1055,6 @@ public class TestAssignmentManagerOnCluster {
|
||||||
assertTrue(regionStates.isRegionOnline(hri));
|
assertTrue(regionStates.isRegionOnline(hri));
|
||||||
assertEquals(oldServerName, regionStates.getRegionServerOfRegion(hri));
|
assertEquals(oldServerName, regionStates.getRegionServerOfRegion(hri));
|
||||||
|
|
||||||
// Try to unassign the dead region before SSH
|
|
||||||
am.unassign(hri);
|
|
||||||
// The region should be moved to offline since the server is dead
|
|
||||||
RegionState state = regionStates.getRegionState(hri);
|
|
||||||
assertTrue(state.isOffline());
|
|
||||||
|
|
||||||
// Disable the table now.
|
// Disable the table now.
|
||||||
master.disableTable(hri.getTable());
|
master.disableTable(hri.getTable());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue