HBASE-20908 Infinite loop on regionserver if region replica are reduced

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Ankit Singhal 2018-07-19 14:58:59 -07:00 committed by tedyu
parent 810473b277
commit e8a7a4472f
2 changed files with 68 additions and 19 deletions

View File

@ -31,7 +31,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
@ -70,8 +69,10 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache; import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
@ -276,7 +277,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
int numWriters, int operationTimeout) { int numWriters, int operationTimeout) {
super(controller, entryBuffers, numWriters); super(controller, entryBuffers, numWriters);
this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout); this.sinkWriter =
new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
this.tableDescriptors = tableDescriptors; this.tableDescriptors = tableDescriptors;
// A cache for the table "memstore replication enabled" flag. // A cache for the table "memstore replication enabled" flag.
@ -390,9 +392,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
int operationTimeout; int operationTimeout;
ExecutorService pool; ExecutorService pool;
Cache<TableName, Boolean> disabledAndDroppedTables; Cache<TableName, Boolean> disabledAndDroppedTables;
TableDescriptors tableDescriptors;
public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
ExecutorService pool, int operationTimeout) { ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
this.sink = sink; this.sink = sink;
this.connection = connection; this.connection = connection;
this.operationTimeout = operationTimeout; this.operationTimeout = operationTimeout;
@ -400,6 +403,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
= RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
this.pool = pool; this.pool = pool;
this.tableDescriptors = tableDescriptors;
int nonExistentTableCacheExpiryMs = connection.getConfiguration() int nonExistentTableCacheExpiryMs = connection.getConfiguration()
.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
@ -506,13 +510,14 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
} }
boolean tasksCancelled = false; boolean tasksCancelled = false;
for (Future<ReplicateWALEntryResponse> task : tasks) { for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
try { try {
task.get(); tasks.get(replicaId).get();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage()); throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) { } catch (ExecutionException e) {
Throwable cause = e.getCause(); Throwable cause = e.getCause();
boolean canBeSkipped = false;
if (cause instanceof IOException) { if (cause instanceof IOException) {
// The table can be disabled or dropped at this time. For disabled tables, we have no // The table can be disabled or dropped at this time. For disabled tables, we have no
// cheap mechanism to detect this case because meta does not contain this information. // cheap mechanism to detect this case because meta does not contain this information.
@ -520,21 +525,34 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
// RPC. So instead we start the replay RPC with retries and check whether the table is // RPC. So instead we start the replay RPC with retries and check whether the table is
// dropped or disabled which might cause SocketTimeoutException, or // dropped or disabled which might cause SocketTimeoutException, or
// RetriesExhaustedException or similar if we get IOE. // RetriesExhaustedException or similar if we get IOE.
if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) { if (cause instanceof TableNotFoundException
|| connection.isTableDisabled(tableName)) {
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
canBeSkipped = true;
} else if (tableDescriptors != null) {
TableDescriptor tableDescriptor = tableDescriptors.get(tableName);
if (tableDescriptor != null
//(replicaId + 1) as no task is added for primary replica for replication
&& tableDescriptor.getRegionReplication() <= (replicaId + 1)) {
canBeSkipped = true;
}
}
if (canBeSkipped) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ " because received exception for dropped or disabled table", cause); + " because received exception for dropped or disabled table",
cause);
for (Entry entry : entries) { for (Entry entry : entries) {
LOG.trace("Skipping : " + entry); LOG.trace("Skipping : " + entry);
} }
} }
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
if (!tasksCancelled) { if (!tasksCancelled) {
sink.getSkippedEditsCounter().addAndGet(entries.size()); sink.getSkippedEditsCounter().addAndGet(entries.size());
tasksCancelled = true; // so that we do not add to skipped counter again tasksCancelled = true; // so that we do not add to skipped counter again
} }
continue; continue;
} }
// otherwise rethrow // otherwise rethrow
throw (IOException)cause; throw (IOException)cause;
} }

View File

@ -29,6 +29,11 @@ import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -50,6 +55,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
@ -263,7 +270,7 @@ public class TestRegionReplicaReplicationEndpoint {
for (int i = 1; i < regionReplication; i++) { for (int i = 1; i < regionReplication; i++) {
final Region region = regions[i]; final Region region = regions[i];
// wait until all the data is replicated to all secondary regions // wait until all the data is replicated to all secondary regions
Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate<Exception>() { Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
LOG.info("verifying replication for region replica:" + region.getRegionInfo()); LOG.info("verifying replication for region replica:" + region.getRegionInfo());
@ -342,7 +349,6 @@ public class TestRegionReplicaReplicationEndpoint {
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName); Table table = connection.getTable(tableName);
try { try {
// load the data to the table // load the data to the table
@ -364,26 +370,35 @@ public class TestRegionReplicaReplicationEndpoint {
@Test @Test
public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception { public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
testRegionReplicaReplicationIgnoresDisabledTables(false); testRegionReplicaReplicationIgnores(false, false);
} }
@Test @Test
public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception { public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
testRegionReplicaReplicationIgnoresDisabledTables(true); testRegionReplicaReplicationIgnores(true, false);
} }
public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) @Test
public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
testRegionReplicaReplicationIgnores(false, true);
}
public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
throws Exception { throws Exception {
// tests having edits from a disabled or dropped table is handled correctly by skipping those // tests having edits from a disabled or dropped table is handled correctly by skipping those
// entries and further edits after the edits from dropped/disabled table can be replicated // entries and further edits after the edits from dropped/disabled table can be replicated
// without problems. // without problems.
final TableName tableName = TableName.valueOf(name.getMethodName() + dropTable); final TableName tableName = TableName.valueOf(
name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
HTableDescriptor htd = HTU.createTableDescriptor(tableName); HTableDescriptor htd = HTU.createTableDescriptor(tableName);
int regionReplication = 3; int regionReplication = 3;
htd.setRegionReplication(regionReplication); htd.setRegionReplication(regionReplication);
HTU.deleteTableIfAny(tableName); HTU.deleteTableIfAny(tableName);
HTU.getAdmin().createTable(htd); HTU.getAdmin().createTable(htd);
TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); TableName toBeDisabledTable = TableName.valueOf(
dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
HTU.deleteTableIfAny(toBeDisabledTable); HTU.deleteTableIfAny(toBeDisabledTable);
htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
htd.setRegionReplication(regionReplication); htd.setRegionReplication(regionReplication);
@ -405,28 +420,44 @@ public class TestRegionReplicaReplicationEndpoint {
RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(),
FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
(ClusterConnection) connection, (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE,
Executors.newSingleThreadExecutor(), Integer.MAX_VALUE); fstd);
RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
.setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
Entry entry = new Entry( Entry entry = new Entry(
new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
new WALEdit()); new WALEdit()
.add(cell));
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
if (dropTable) { if (dropTable) {
HTU.getAdmin().deleteTable(toBeDisabledTable); HTU.getAdmin().deleteTable(toBeDisabledTable);
} else if (disableReplication) {
htd.setRegionReplication(regionReplication - 2);
HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
HTU.getAdmin().enableTable(toBeDisabledTable);
} }
sinkWriter.append(toBeDisabledTable, encodedRegionName, sinkWriter.append(toBeDisabledTable, encodedRegionName,
HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
assertEquals(2, skippedEdits.get()); assertEquals(2, skippedEdits.get());
if (disableReplication) {
// enable replication again so that we can verify replication
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
htd.setRegionReplication(regionReplication);
HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
HTU.getAdmin().enableTable(toBeDisabledTable);
}
try { try {
// load some data to the to-be-dropped table // load some data to the to-be-dropped table