HBASE-20908 Infinite loop on regionserver if region replica are reduced
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
067388bfd9
commit
eb906e20ee
|
@ -31,7 +31,6 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
|
@ -70,8 +69,10 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
|
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
|
||||||
|
|
||||||
|
@ -276,7 +277,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
|
EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
|
||||||
int numWriters, int operationTimeout) {
|
int numWriters, int operationTimeout) {
|
||||||
super(controller, entryBuffers, numWriters);
|
super(controller, entryBuffers, numWriters);
|
||||||
this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout);
|
this.sinkWriter =
|
||||||
|
new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
|
||||||
this.tableDescriptors = tableDescriptors;
|
this.tableDescriptors = tableDescriptors;
|
||||||
|
|
||||||
// A cache for the table "memstore replication enabled" flag.
|
// A cache for the table "memstore replication enabled" flag.
|
||||||
|
@ -390,9 +392,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
int operationTimeout;
|
int operationTimeout;
|
||||||
ExecutorService pool;
|
ExecutorService pool;
|
||||||
Cache<TableName, Boolean> disabledAndDroppedTables;
|
Cache<TableName, Boolean> disabledAndDroppedTables;
|
||||||
|
TableDescriptors tableDescriptors;
|
||||||
|
|
||||||
public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
|
public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
|
||||||
ExecutorService pool, int operationTimeout) {
|
ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
|
||||||
this.sink = sink;
|
this.sink = sink;
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.operationTimeout = operationTimeout;
|
this.operationTimeout = operationTimeout;
|
||||||
|
@ -400,6 +403,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
= RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
|
= RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
|
||||||
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
|
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
|
||||||
this.pool = pool;
|
this.pool = pool;
|
||||||
|
this.tableDescriptors = tableDescriptors;
|
||||||
|
|
||||||
int nonExistentTableCacheExpiryMs = connection.getConfiguration()
|
int nonExistentTableCacheExpiryMs = connection.getConfiguration()
|
||||||
.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
|
.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
|
||||||
|
@ -506,13 +510,14 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean tasksCancelled = false;
|
boolean tasksCancelled = false;
|
||||||
for (Future<ReplicateWALEntryResponse> task : tasks) {
|
for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
|
||||||
try {
|
try {
|
||||||
task.get();
|
tasks.get(replicaId).get();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new InterruptedIOException(e.getMessage());
|
throw new InterruptedIOException(e.getMessage());
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
Throwable cause = e.getCause();
|
Throwable cause = e.getCause();
|
||||||
|
boolean canBeSkipped = false;
|
||||||
if (cause instanceof IOException) {
|
if (cause instanceof IOException) {
|
||||||
// The table can be disabled or dropped at this time. For disabled tables, we have no
|
// The table can be disabled or dropped at this time. For disabled tables, we have no
|
||||||
// cheap mechanism to detect this case because meta does not contain this information.
|
// cheap mechanism to detect this case because meta does not contain this information.
|
||||||
|
@ -520,21 +525,34 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
// RPC. So instead we start the replay RPC with retries and check whether the table is
|
// RPC. So instead we start the replay RPC with retries and check whether the table is
|
||||||
// dropped or disabled which might cause SocketTimeoutException, or
|
// dropped or disabled which might cause SocketTimeoutException, or
|
||||||
// RetriesExhaustedException or similar if we get IOE.
|
// RetriesExhaustedException or similar if we get IOE.
|
||||||
if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
|
if (cause instanceof TableNotFoundException
|
||||||
|
|| connection.isTableDisabled(tableName)) {
|
||||||
|
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
|
||||||
|
canBeSkipped = true;
|
||||||
|
} else if (tableDescriptors != null) {
|
||||||
|
TableDescriptor tableDescriptor = tableDescriptors.get(tableName);
|
||||||
|
if (tableDescriptor != null
|
||||||
|
//(replicaId + 1) as no task is added for primary replica for replication
|
||||||
|
&& tableDescriptor.getRegionReplication() <= (replicaId + 1)) {
|
||||||
|
canBeSkipped = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (canBeSkipped) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
|
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
|
||||||
+ " because received exception for dropped or disabled table", cause);
|
+ " because received exception for dropped or disabled table",
|
||||||
|
cause);
|
||||||
for (Entry entry : entries) {
|
for (Entry entry : entries) {
|
||||||
LOG.trace("Skipping : " + entry);
|
LOG.trace("Skipping : " + entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
|
|
||||||
if (!tasksCancelled) {
|
if (!tasksCancelled) {
|
||||||
sink.getSkippedEditsCounter().addAndGet(entries.size());
|
sink.getSkippedEditsCounter().addAndGet(entries.size());
|
||||||
tasksCancelled = true; // so that we do not add to skipped counter again
|
tasksCancelled = true; // so that we do not add to skipped counter again
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise rethrow
|
// otherwise rethrow
|
||||||
throw (IOException)cause;
|
throw (IOException)cause;
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,11 @@ import java.util.List;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.Cell.Type;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilderType;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -50,6 +55,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||||
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
@ -263,7 +270,7 @@ public class TestRegionReplicaReplicationEndpoint {
|
||||||
for (int i = 1; i < regionReplication; i++) {
|
for (int i = 1; i < regionReplication; i++) {
|
||||||
final Region region = regions[i];
|
final Region region = regions[i];
|
||||||
// wait until all the data is replicated to all secondary regions
|
// wait until all the data is replicated to all secondary regions
|
||||||
Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate<Exception>() {
|
Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
LOG.info("verifying replication for region replica:" + region.getRegionInfo());
|
LOG.info("verifying replication for region replica:" + region.getRegionInfo());
|
||||||
|
@ -342,7 +349,6 @@ public class TestRegionReplicaReplicationEndpoint {
|
||||||
|
|
||||||
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
|
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
|
||||||
Table table = connection.getTable(tableName);
|
Table table = connection.getTable(tableName);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// load the data to the table
|
// load the data to the table
|
||||||
|
|
||||||
|
@ -364,26 +370,35 @@ public class TestRegionReplicaReplicationEndpoint {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
|
public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
|
||||||
testRegionReplicaReplicationIgnoresDisabledTables(false);
|
testRegionReplicaReplicationIgnores(false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
|
public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
|
||||||
testRegionReplicaReplicationIgnoresDisabledTables(true);
|
testRegionReplicaReplicationIgnores(true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable)
|
@Test
|
||||||
|
public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
|
||||||
|
testRegionReplicaReplicationIgnores(false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
// tests having edits from a disabled or dropped table is handled correctly by skipping those
|
// tests having edits from a disabled or dropped table is handled correctly by skipping those
|
||||||
// entries and further edits after the edits from dropped/disabled table can be replicated
|
// entries and further edits after the edits from dropped/disabled table can be replicated
|
||||||
// without problems.
|
// without problems.
|
||||||
final TableName tableName = TableName.valueOf(name.getMethodName() + dropTable);
|
final TableName tableName = TableName.valueOf(
|
||||||
|
name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
|
||||||
HTableDescriptor htd = HTU.createTableDescriptor(tableName);
|
HTableDescriptor htd = HTU.createTableDescriptor(tableName);
|
||||||
int regionReplication = 3;
|
int regionReplication = 3;
|
||||||
htd.setRegionReplication(regionReplication);
|
htd.setRegionReplication(regionReplication);
|
||||||
HTU.deleteTableIfAny(tableName);
|
HTU.deleteTableIfAny(tableName);
|
||||||
|
|
||||||
HTU.getAdmin().createTable(htd);
|
HTU.getAdmin().createTable(htd);
|
||||||
TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable");
|
TableName toBeDisabledTable = TableName.valueOf(
|
||||||
|
dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
|
||||||
HTU.deleteTableIfAny(toBeDisabledTable);
|
HTU.deleteTableIfAny(toBeDisabledTable);
|
||||||
htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
|
htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
|
||||||
htd.setRegionReplication(regionReplication);
|
htd.setRegionReplication(regionReplication);
|
||||||
|
@ -405,28 +420,44 @@ public class TestRegionReplicaReplicationEndpoint {
|
||||||
RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
|
RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
|
||||||
mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
|
mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
|
||||||
when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
|
when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
|
||||||
|
FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(),
|
||||||
|
FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
|
||||||
RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
|
RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
|
||||||
new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
|
new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
|
||||||
(ClusterConnection) connection,
|
(ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE,
|
||||||
Executors.newSingleThreadExecutor(), Integer.MAX_VALUE);
|
fstd);
|
||||||
RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
|
RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
|
||||||
HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
|
HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
|
||||||
byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
|
byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
|
||||||
|
|
||||||
|
Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
|
||||||
|
.setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
|
||||||
Entry entry = new Entry(
|
Entry entry = new Entry(
|
||||||
new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
|
new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
|
||||||
new WALEdit());
|
new WALEdit()
|
||||||
|
.add(cell));
|
||||||
|
|
||||||
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
|
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
|
||||||
if (dropTable) {
|
if (dropTable) {
|
||||||
HTU.getAdmin().deleteTable(toBeDisabledTable);
|
HTU.getAdmin().deleteTable(toBeDisabledTable);
|
||||||
|
} else if (disableReplication) {
|
||||||
|
htd.setRegionReplication(regionReplication - 2);
|
||||||
|
HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
|
||||||
|
HTU.getAdmin().enableTable(toBeDisabledTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
sinkWriter.append(toBeDisabledTable, encodedRegionName,
|
sinkWriter.append(toBeDisabledTable, encodedRegionName,
|
||||||
HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
|
HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
|
||||||
|
|
||||||
assertEquals(2, skippedEdits.get());
|
assertEquals(2, skippedEdits.get());
|
||||||
|
|
||||||
|
if (disableReplication) {
|
||||||
|
// enable replication again so that we can verify replication
|
||||||
|
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
|
||||||
|
htd.setRegionReplication(regionReplication);
|
||||||
|
HTU.getAdmin().modifyTable(toBeDisabledTable, htd);
|
||||||
|
HTU.getAdmin().enableTable(toBeDisabledTable);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// load some data to the to-be-dropped table
|
// load some data to the to-be-dropped table
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue