HBASE-13895 DATALOSS: Region assigned before WAL replay when abort (Enis Soztutar)

This commit is contained in:
stack 2015-07-01 17:30:16 -07:00
parent 93b0b41603
commit 0f17b76796
5 changed files with 291 additions and 35 deletions

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Thrown by the region server when it is in shutting down state.
* @see RegionServerAbortedException
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public

View File

@ -26,8 +26,11 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -37,23 +40,26 @@ import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.NMapInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@ -62,14 +68,22 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -92,6 +106,9 @@ import java.util.regex.Pattern;
*/
@Category(IntegrationTests.class)
public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
private static final Log LOG = LogFactory.getLog(IntegrationTestLoadAndVerify.class);
private static final String TEST_NAME = "IntegrationTestLoadAndVerify";
private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("q1");
@ -113,7 +130,10 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase {
private static final int SCANNER_CACHING = 500;
private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
private String toRun = null;
private String keysDir = null;
private enum Counters {
ROWS_WRITTEN,
@ -268,7 +288,6 @@ public void cleanUpCluster() throws Exception {
}
public static class VerifyReducer extends Reducer<BytesWritable, BytesWritable, Text, Text> {
private static final Log LOG = LogFactory.getLog(VerifyReducer.class);
private Counter refsChecked;
private Counter rowsWritten;
@ -313,6 +332,7 @@ public void cleanUpCluster() throws Exception {
protected Job doLoad(Configuration conf, HTableDescriptor htd) throws Exception {
Path outputDir = getTestDir(TEST_NAME, "load-output");
LOG.info("Load output dir: " + outputDir);
NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT));
conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString());
@ -340,6 +360,7 @@ public void cleanUpCluster() throws Exception {
protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
Path outputDir = getTestDir(TEST_NAME, "verify-output");
LOG.info("Verify output dir: " + outputDir);
Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
@ -364,6 +385,139 @@ public void cleanUpCluster() throws Exception {
assertEquals(0, numOutputRecords);
}
/**
* Tool to search missing rows in WALs and hfiles.
* Pass in file or dir of keys to search for. Key file must have been written by Verify step
* (we depend on the format it writes out. We'll read them in and then search in hbase
* WALs and oldWALs dirs (Some of this is TODO).
*/
public static class WALSearcher extends WALPlayer {
public WALSearcher(Configuration conf) {
super(conf);
}
/**
* The actual searcher mapper.
*/
public static class WALMapperSearcher extends WALMapper {
private SortedSet<byte []> keysToFind;
private AtomicInteger rows = new AtomicInteger(0);
@Override
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
throws IOException {
super.setup(context);
try {
this.keysToFind = readKeysToSearch(context.getConfiguration());
LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
}
@Override
protected boolean filter(Context context, Cell cell) {
// TODO: Can I do a better compare than this copying out key?
byte [] row = new byte [cell.getRowLength()];
System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
boolean b = this.keysToFind.contains(row);
if (b) {
String keyStr = Bytes.toStringBinary(row);
try {
LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
} catch (IOException|InterruptedException e) {
LOG.warn(e);
}
if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
}
context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
}
return b;
}
}
// Put in place the above WALMapperSearcher.
@Override
public Job createSubmittableJob(String[] args) throws IOException {
Job job = super.createSubmittableJob(args);
// Call my class instead.
job.setJarByClass(WALMapperSearcher.class);
job.setMapperClass(WALMapperSearcher.class);
job.setOutputFormatClass(NullOutputFormat.class);
return job;
}
}
static final String FOUND_GROUP_KEY = "Found";
static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
static SortedSet<byte []> readKeysToSearch(final Configuration conf)
throws IOException, InterruptedException {
Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
FileSystem fs = FileSystem.get(conf);
SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
if (!fs.exists(keysInputDir)) {
throw new FileNotFoundException(keysInputDir.toString());
}
if (!fs.isDirectory(keysInputDir)) {
FileStatus keyFileStatus = fs.getFileStatus(keysInputDir);
readFileToSearch(conf, fs, keyFileStatus, result);
} else {
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
while(iterator.hasNext()) {
LocatedFileStatus keyFileStatus = iterator.next();
// Skip "_SUCCESS" file.
if (keyFileStatus.getPath().getName().startsWith("_")) continue;
readFileToSearch(conf, fs, keyFileStatus, result);
}
}
return result;
}
private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
final FileSystem fs, final FileStatus keyFileStatus, SortedSet<byte []> result)
throws IOException,
InterruptedException {
// verify uses file output format and writes <Text, Text>. We can read it as a text file
try (InputStream in = fs.open(keyFileStatus.getPath());
BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
// extract out the key and return that missing as a missing key
String line;
while ((line = reader.readLine()) != null) {
if (line.isEmpty()) continue;
String[] parts = line.split("\\s+");
if (parts.length >= 1) {
String key = parts[0];
result.add(Bytes.toBytesBinary(key));
} else {
LOG.info("Cannot parse key from: " + line);
}
}
}
return result;
}
private int doSearch(Configuration conf, String keysDir) throws Exception {
Path inputDir = new Path(keysDir);
getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
SortedSet<byte []> keys = readKeysToSearch(getConf());
if (keys.isEmpty()) throw new RuntimeException("No keys to find");
LOG.info("Count of keys to find: " + keys.size());
for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
// Now read all WALs. In two dirs. Presumes certain layout.
Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
LOG.info("Running Search with keys inputDir=" + inputDir +
" against " + getConf().get(HConstants.HBASE_DIR));
int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
if (ret != 0) return ret;
return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
}
private static void setJobScannerConf(Job job) {
// Make sure scanners log something useful to make debugging possible.
job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
@ -372,11 +526,8 @@ public void cleanUpCluster() throws Exception {
}
public Path getTestDir(String testName, String subdir) throws IOException {
//HBaseTestingUtility.getDataTestDirOnTestFs() has not been backported.
Path testDir = util.getDataTestDirOnTestFS(testName);
FileSystem fs = FileSystem.get(getConf());
Path base = new Path(fs.getWorkingDirectory(), "test-data");
String randomStr = UUID.randomUUID().toString();
Path testDir = new Path(base, randomStr);
fs.deleteOnExit(testDir);
return new Path(new Path(testDir, testName), subdir);
@ -399,7 +550,8 @@ public void cleanUpCluster() throws Exception {
}
public void usage() {
System.err.println(this.getClass().getSimpleName() + " [-Doptions] <load|verify|loadAndVerify>");
System.err.println(this.getClass().getSimpleName()
+ " [-Doptions] <load|verify|loadAndVerify|search>");
System.err.println(" Loads a table with row dependencies and verifies the dependency chains");
System.err.println("Options");
System.err.println(" -Dloadmapper.table=<name> Table to write/verify (default autogen)");
@ -418,11 +570,16 @@ public void cleanUpCluster() throws Exception {
super.processOptions(cmd);
String[] args = cmd.getArgs();
if (args == null || args.length < 1 || args.length > 1) {
if (args == null || args.length < 1) {
usage();
throw new RuntimeException("Incorrect Number of args.");
}
toRun = args[0];
if (toRun.equalsIgnoreCase("search")) {
if (args.length > 1) {
keysDir = args[1];
}
}
}
@Override
@ -430,16 +587,25 @@ public void cleanUpCluster() throws Exception {
IntegrationTestingUtility.setUseDistributedCluster(getConf());
boolean doLoad = false;
boolean doVerify = false;
boolean doSearch = false;
boolean doDelete = getConf().getBoolean("loadmapper.deleteAfter",true);
int numPresplits = getConf().getInt("loadmapper.numPresplits", 40);
if (toRun.equals("load")) {
if (toRun.equalsIgnoreCase("load")) {
doLoad = true;
} else if (toRun.equals("verify")) {
} else if (toRun.equalsIgnoreCase("verify")) {
doVerify= true;
} else if (toRun.equals("loadAndVerify")) {
} else if (toRun.equalsIgnoreCase("loadAndVerify")) {
doLoad=true;
doVerify= true;
} else if (toRun.equalsIgnoreCase("search")) {
doLoad=false;
doVerify= false;
doSearch = true;
if (keysDir == null) {
System.err.println("Usage: search <KEYS_DIR>]");
return 1;
}
} else {
System.err.println("Invalid argument " + toRun);
usage();
@ -451,9 +617,9 @@ public void cleanUpCluster() throws Exception {
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
try (Connection conn = ConnectionFactory.createConnection(getConf());
Admin admin = conn.getAdmin()) {
if (doLoad) {
if (doLoad) {
try (Connection conn = ConnectionFactory.createConnection(getConf());
Admin admin = conn.getAdmin()) {
admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits);
doLoad(getConf(), htd);
}
@ -464,6 +630,9 @@ public void cleanUpCluster() throws Exception {
getTestingUtil(getConf()).deleteTable(htd.getTableName());
}
}
if (doSearch) {
return doSearch(getConf(), keysDir);
}
return 0;
}

View File

@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.quotas.RegionStateListener;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.ConfigUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -1862,11 +1863,19 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
region.getRegionNameAsString());
} catch (Throwable t) {
long sleepTime = 0;
Configuration conf = this.server.getConfiguration();
if (t instanceof RemoteException) {
t = ((RemoteException)t).unwrapRemoteException();
}
boolean logRetries = true;
if (t instanceof NotServingRegionException
if (t instanceof RegionServerAbortedException) {
// RS is aborting, we cannot offline the region since the region may need to do WAL
// recovery. Until we see the RS expiration, we should retry.
sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
} else if (t instanceof NotServingRegionException
|| t instanceof RegionServerStoppedException
|| t instanceof ServerNotRunningYetException) {
LOG.debug("Offline " + region.getRegionNameAsString()
@ -1880,8 +1889,6 @@ public class AssignmentManager extends ZooKeeperListener {
return;
} else if ((t instanceof FailedServerException) || (state != null &&
t instanceof RegionAlreadyInTransitionException)) {
long sleepTime = 0;
Configuration conf = this.server.getConfiguration();
if(t instanceof FailedServerException) {
sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
@ -1904,19 +1911,20 @@ public class AssignmentManager extends ZooKeeperListener {
logRetries = false;
}
}
try {
if (sleepTime > 0) {
Thread.sleep(sleepTime);
}
} catch (InterruptedException ie) {
LOG.warn("Failed to unassign "
+ region.getRegionNameAsString() + " since interrupted", ie);
Thread.currentThread().interrupt();
if (state != null) {
regionStates.updateRegionState(region, State.FAILED_CLOSE);
}
return;
}
try {
if (sleepTime > 0) {
Thread.sleep(sleepTime);
}
} catch (InterruptedException ie) {
LOG.warn("Failed to unassign "
+ region.getRegionNameAsString() + " since interrupted", ie);
Thread.currentThread().interrupt();
if (state != null) {
regionStates.updateRegionState(region, State.FAILED_CLOSE);
}
return;
}
if (logRetries) {
@ -2002,7 +2010,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
@SuppressWarnings("deprecation")
private boolean wasRegionOnDeadServerByMeta(
protected boolean wasRegionOnDeadServerByMeta(
final HRegionInfo region, final ServerName sn) {
try {
if (region.isMetaRegion()) {
@ -2489,7 +2497,7 @@ public class AssignmentManager extends ZooKeeperListener {
if (state == null || state.getServerName() == null) {
// We don't know where the region is, offline it.
// No need to send CLOSE RPC
LOG.warn("Attempting to unassign a region not in RegionStates"
LOG.warn("Attempting to unassign a region not in RegionStates "
+ region.getRegionNameAsString() + ", offlined");
regionOffline(region);
return;

View File

@ -980,9 +980,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws IOException
*/
protected void checkOpen() throws IOException {
if (regionServer.isStopped() || regionServer.isAborted()) {
throw new RegionServerStoppedException("Server " + regionServer.serverName
+ " not running" + (regionServer.isAborted() ? ", aborting" : ""));
if (regionServer.isAborted()) {
throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
}
if (regionServer.isStopped()) {
throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
}
if (!regionServer.fsOk) {
throw new RegionServerStoppedException("File system not available");

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -1611,4 +1612,79 @@ public class TestAssignmentManager {
am.shutdown();
}
}
/**
* Tests close region call on a region server that is aborting
*/
@Test (timeout=180000)
public void testCloseRegionOnAbortingRS() throws Exception {
this.server.getConfiguration().setInt("hbase.assignment.maximum.attempts", 2);
HRegionInfo hri = REGIONINFO;
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(
server.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, balancer, null, null, master.getTableLockManager());
RegionStates regionStates = am.getRegionStates();
regionStates.createRegionState(hri, State.OPEN, SERVERNAME_B, SERVERNAME_B);
// mock aborting region server
Mockito.when(this.serverManager.sendRegionClose(Mockito.eq(SERVERNAME_B), Mockito.eq(REGIONINFO),
Mockito.anyInt(), (ServerName)Mockito.any(), Mockito.anyBoolean()))
.thenThrow(new RegionServerAbortedException(""));
// try to unassign the region
am.unassign(hri);
// assert that the we have FAILED_CLOSE for region state
assertEquals(State.FAILED_CLOSE, regionStates.getRegionState(REGIONINFO).getState());
assertEquals(SERVERNAME_B, regionStates.getRegionState(REGIONINFO).getServerName());
am.shutdown();
}
/**
* Tests close region call on a region server that is not in onlineServer list
*/
@Test (timeout=180000)
public void testCloseRegionOnServerNotOnline() throws Exception {
this.server.getConfiguration().setInt("hbase.assignment.maximum.attempts", 2);
HRegionInfo hri = REGIONINFO;
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(
server.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, balancer, null, null, master.getTableLockManager()) {
@Override
protected boolean wasRegionOnDeadServerByMeta(HRegionInfo region, ServerName sn) {
return true;
};
};
RegionStates regionStates = am.getRegionStates();
regionStates.createRegionState(hri, State.OPEN, SERVERNAME_B, SERVERNAME_B);
// mock that RS is expired, but not processed
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B))
.thenReturn(false);
// try to unassign the region
am.unassign(hri);
// assert that the we have OFFLINE
assertEquals(State.OFFLINE, regionStates.getRegionState(REGIONINFO).getState());
// try to assign the region before SSH
am.regionPlans.put(REGIONINFO.getEncodedName(),
new RegionPlan(REGIONINFO, null, SERVERNAME_A));
am.assign(hri, true, false);
// assert that the we still have OFFLINE
assertEquals(State.OFFLINE, regionStates.getRegionState(REGIONINFO).getState());
am.shutdown();
}
}