HBASE-24220 Allow that zk NOTEMPTY multi exception is retryable by running in-series
hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java Cleanup checkstyle warnings. Don't depend on hbase-client ScannerCallable. hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java Cut down on cluster resource usage. hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java Debug hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController3.java Debug hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftHttpServer.java Debug hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java Debug hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Allow that NONEMPTY is retryable by running in series.
This commit is contained in:
parent
bcacc4ce93
commit
2d2e1d965d
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.hbase.mapred;
|
package org.apache.hadoop.hbase.mapred;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
|
import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
@ -58,9 +57,6 @@ public class TableRecordReaderImpl {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart from survivable exceptions by creating a new scanner.
|
* Restart from survivable exceptions by creating a new scanner.
|
||||||
*
|
|
||||||
* @param firstRow
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
public void restart(byte[] firstRow) throws IOException {
|
public void restart(byte[] firstRow) throws IOException {
|
||||||
Scan currentScan;
|
Scan currentScan;
|
||||||
|
@ -100,8 +96,6 @@ public class TableRecordReaderImpl {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build the scanner. Not done in constructor to allow for extension.
|
* Build the scanner. Not done in constructor to allow for extension.
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
public void init() throws IOException {
|
public void init() throws IOException {
|
||||||
restart(startRow);
|
restart(startRow);
|
||||||
|
@ -194,10 +188,8 @@ public class TableRecordReaderImpl {
|
||||||
* @param key HStoreKey as input key.
|
* @param key HStoreKey as input key.
|
||||||
* @param value MapWritable as input value
|
* @param value MapWritable as input value
|
||||||
* @return true if there was more data
|
* @return true if there was more data
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
public boolean next(ImmutableBytesWritable key, Result value)
|
public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
|
||||||
throws IOException {
|
|
||||||
Result result;
|
Result result;
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -37,7 +37,6 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -102,10 +101,9 @@ public class TableRecordReaderImpl {
|
||||||
* In new mapreduce APIs, TaskAttemptContext has two getCounter methods
|
* In new mapreduce APIs, TaskAttemptContext has two getCounter methods
|
||||||
* Check if getCounter(String, String) method is available.
|
* Check if getCounter(String, String) method is available.
|
||||||
* @return The getCounter method or null if not available.
|
* @return The getCounter method or null if not available.
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
|
protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Method m = null;
|
Method m = null;
|
||||||
try {
|
try {
|
||||||
m = context.getClass().getMethod("getCounter",
|
m = context.getClass().getMethod("getCounter",
|
||||||
|
@ -142,9 +140,6 @@ public class TableRecordReaderImpl {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build the scanner. Not done in constructor to allow for extension.
|
* Build the scanner. Not done in constructor to allow for extension.
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
* @throws InterruptedException
|
|
||||||
*/
|
*/
|
||||||
public void initialize(InputSplit inputsplit,
|
public void initialize(InputSplit inputsplit,
|
||||||
TaskAttemptContext context) throws IOException,
|
TaskAttemptContext context) throws IOException,
|
||||||
|
@ -176,7 +171,6 @@ public class TableRecordReaderImpl {
|
||||||
* Returns the current key.
|
* Returns the current key.
|
||||||
*
|
*
|
||||||
* @return The current key.
|
* @return The current key.
|
||||||
* @throws IOException
|
|
||||||
* @throws InterruptedException When the job is aborted.
|
* @throws InterruptedException When the job is aborted.
|
||||||
*/
|
*/
|
||||||
public ImmutableBytesWritable getCurrentKey() throws IOException,
|
public ImmutableBytesWritable getCurrentKey() throws IOException,
|
||||||
|
@ -204,12 +198,18 @@ public class TableRecordReaderImpl {
|
||||||
* @throws InterruptedException When the job was aborted.
|
* @throws InterruptedException When the job was aborted.
|
||||||
*/
|
*/
|
||||||
public boolean nextKeyValue() throws IOException, InterruptedException {
|
public boolean nextKeyValue() throws IOException, InterruptedException {
|
||||||
if (key == null) key = new ImmutableBytesWritable();
|
if (key == null) {
|
||||||
if (value == null) value = new Result();
|
key = new ImmutableBytesWritable();
|
||||||
|
}
|
||||||
|
if (value == null) {
|
||||||
|
value = new Result();
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
value = this.scanner.next();
|
value = this.scanner.next();
|
||||||
if (value != null && value.isStale()) numStale++;
|
if (value != null && value.isStale()) {
|
||||||
|
numStale++;
|
||||||
|
}
|
||||||
if (logScannerActivity) {
|
if (logScannerActivity) {
|
||||||
rowcount ++;
|
rowcount ++;
|
||||||
if (rowcount >= logPerRowCount) {
|
if (rowcount >= logPerRowCount) {
|
||||||
|
@ -242,7 +242,9 @@ public class TableRecordReaderImpl {
|
||||||
scanner.next(); // skip presumed already mapped row
|
scanner.next(); // skip presumed already mapped row
|
||||||
}
|
}
|
||||||
value = scanner.next();
|
value = scanner.next();
|
||||||
if (value != null && value.isStale()) numStale++;
|
if (value != null && value.isStale()) {
|
||||||
|
numStale++;
|
||||||
|
}
|
||||||
numRestarts++;
|
numRestarts++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,7 +283,6 @@ public class TableRecordReaderImpl {
|
||||||
* counters thus can update counters based on scanMetrics.
|
* counters thus can update counters based on scanMetrics.
|
||||||
* If hbase runs on old version of mapreduce, it won't be able to get
|
* If hbase runs on old version of mapreduce, it won't be able to get
|
||||||
* access to counters and TableRecorderReader can't update counter values.
|
* access to counters and TableRecorderReader can't update counter values.
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
private void updateCounters() throws IOException {
|
private void updateCounters() throws IOException {
|
||||||
ScanMetrics scanMetrics = scanner.getScanMetrics();
|
ScanMetrics scanMetrics = scanner.getScanMetrics();
|
||||||
|
|
|
@ -92,7 +92,7 @@ public class TestExportSnapshot {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
setUpBaseConf(TEST_UTIL.getConfiguration());
|
setUpBaseConf(TEST_UTIL.getConfiguration());
|
||||||
TEST_UTIL.startMiniCluster(3);
|
TEST_UTIL.startMiniCluster(1);
|
||||||
TEST_UTIL.startMiniMapReduceCluster();
|
TEST_UTIL.startMiniMapReduceCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -30,7 +30,6 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -43,6 +42,7 @@ import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -92,7 +92,6 @@ import org.junit.Test;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -225,7 +224,7 @@ public abstract class AbstractTestDLS {
|
||||||
Path editsdir = WALSplitUtil
|
Path editsdir = WALSplitUtil
|
||||||
.getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
|
.getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
|
||||||
tableName, hri.getEncodedName()));
|
tableName, hri.getEncodedName()));
|
||||||
LOG.debug("checking edits dir " + editsdir);
|
LOG.debug("Checking edits dir " + editsdir);
|
||||||
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
|
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(Path p) {
|
public boolean accept(Path p) {
|
||||||
|
@ -235,9 +234,9 @@ public abstract class AbstractTestDLS {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
assertTrue(
|
LOG.info("Files {}", Arrays.stream(files).map(f -> f.getPath().toString()).
|
||||||
"edits dir should have more than a single file in it. instead has " + files.length,
|
collect(Collectors.joining(",")));
|
||||||
files.length > 1);
|
assertTrue("Edits dir should have more than a one file", files.length > 1);
|
||||||
for (int i = 0; i < files.length; i++) {
|
for (int i = 0; i < files.length; i++) {
|
||||||
int c = countWAL(files[i].getPath(), fs, conf);
|
int c = countWAL(files[i].getPath(), fs, conf);
|
||||||
count += c;
|
count += c;
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.security.access;
|
||||||
import static org.apache.hadoop.hbase.AuthUtil.toGroupEntry;
|
import static org.apache.hadoop.hbase.AuthUtil.toGroupEntry;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Coprocessor;
|
import org.apache.hadoop.hbase.Coprocessor;
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
|
@ -195,14 +194,13 @@ public class TestAccessController3 extends SecureTestUtil {
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
HRegionServer rs = null;
|
assertEquals(1, TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().size());
|
||||||
for (JVMClusterUtil.RegionServerThread thread:
|
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().get(0).
|
||||||
TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
getRegionServer();
|
||||||
rs = thread.getRegionServer();
|
// Strange place for an assert.
|
||||||
}
|
assertFalse("RegionServer should have ABORTED (FaultyAccessController)", rs.isAborted());
|
||||||
cleanUp();
|
cleanUp();
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
assertFalse("region server should have aborted due to FaultyAccessController", rs.isAborted());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setUpTableAndUserPermissions() throws Exception {
|
private static void setUpTableAndUserPermissions() throws Exception {
|
||||||
|
|
|
@ -207,7 +207,8 @@ public class TestThriftHttpServer {
|
||||||
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
|
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
|
||||||
conn.setRequestMethod("TRACE");
|
conn.setRequestMethod("TRACE");
|
||||||
conn.connect();
|
conn.connect();
|
||||||
Assert.assertEquals(HttpURLConnection.HTTP_FORBIDDEN, conn.getResponseCode());
|
Assert.assertEquals(conn.getResponseMessage(),
|
||||||
|
HttpURLConnection.HTTP_FORBIDDEN, conn.getResponseCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static volatile boolean tableCreated = false;
|
protected static volatile boolean tableCreated = false;
|
||||||
|
|
|
@ -323,19 +323,18 @@ public class MiniZooKeeperCluster {
|
||||||
public void shutdown() throws IOException {
|
public void shutdown() throws IOException {
|
||||||
// shut down all the zk servers
|
// shut down all the zk servers
|
||||||
for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
|
for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
|
||||||
NIOServerCnxnFactory standaloneServerFactory =
|
NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
|
||||||
standaloneServerFactoryList.get(i);
|
|
||||||
int clientPort = clientPortList.get(i);
|
int clientPort = clientPortList.get(i);
|
||||||
|
|
||||||
standaloneServerFactory.shutdown();
|
standaloneServerFactory.shutdown();
|
||||||
if (!waitForServerDown(clientPort, connectionTimeout)) {
|
if (!waitForServerDown(clientPort, connectionTimeout)) {
|
||||||
throw new IOException("Waiting for shutdown of standalone server");
|
throw new IOException("Waiting for shutdown of standalone server at port=" + clientPort +
|
||||||
|
", timeout=" + this.connectionTimeout);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
standaloneServerFactoryList.clear();
|
standaloneServerFactoryList.clear();
|
||||||
|
|
||||||
for (ZooKeeperServer zkServer: zooKeeperServers) {
|
for (ZooKeeperServer zkServer: zooKeeperServers) {
|
||||||
//explicitly close ZKDatabase since ZookeeperServer does not close them
|
// Explicitly close ZKDatabase since ZookeeperServer does not close them
|
||||||
zkServer.getZKDatabase().close();
|
zkServer.getZKDatabase().close();
|
||||||
}
|
}
|
||||||
zooKeeperServers.clear();
|
zooKeeperServers.clear();
|
||||||
|
|
|
@ -36,6 +36,7 @@ import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import javax.security.auth.login.AppConfigurationEntry;
|
import javax.security.auth.login.AppConfigurationEntry;
|
||||||
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
|
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
|
||||||
|
@ -1535,6 +1536,10 @@ public final class ZKUtil {
|
||||||
public abstract static class ZKUtilOp {
|
public abstract static class ZKUtilOp {
|
||||||
private String path;
|
private String path;
|
||||||
|
|
||||||
|
@Override public String toString() {
|
||||||
|
return this.getClass().getSimpleName() + ", path=" + this.path;
|
||||||
|
}
|
||||||
|
|
||||||
private ZKUtilOp(String path) {
|
private ZKUtilOp(String path) {
|
||||||
this.path = path;
|
this.path = path;
|
||||||
}
|
}
|
||||||
|
@ -1750,12 +1755,13 @@ public final class ZKUtil {
|
||||||
case NONODE:
|
case NONODE:
|
||||||
case BADVERSION:
|
case BADVERSION:
|
||||||
case NOAUTH:
|
case NOAUTH:
|
||||||
|
case NOTEMPTY:
|
||||||
// if we get an exception that could be solved by running sequentially
|
// if we get an exception that could be solved by running sequentially
|
||||||
// (and the client asked us to), then break out and run sequentially
|
// (and the client asked us to), then break out and run sequentially
|
||||||
if (runSequentialOnMultiFailure) {
|
if (runSequentialOnMultiFailure) {
|
||||||
LOG.info("On call to ZK.multi, received exception: " + ke.toString() + "."
|
LOG.info("multi exception: {}; running operations sequentially " +
|
||||||
+ " Attempting to run operations sequentially because"
|
"(runSequentialOnMultiFailure=true); {}", ke.toString(),
|
||||||
+ " runSequentialOnMultiFailure is: " + runSequentialOnMultiFailure + ".");
|
ops.stream().map(o -> o.toString()).collect(Collectors.joining(",")));
|
||||||
processSequentially(zkw, ops);
|
processSequentially(zkw, ops);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue