HBASE-20908 Infinite loop on regionserver if region replica are reduced
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
2eaa24a132
commit
56aa9ab8a3
|
@ -323,7 +323,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
|
||||
int numWriters, int operationTimeout) {
|
||||
super(controller, entryBuffers, numWriters);
|
||||
this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout);
|
||||
this.sinkWriter =
|
||||
new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
|
||||
this.tableDescriptors = tableDescriptors;
|
||||
|
||||
// A cache for the table "memstore replication enabled" flag.
|
||||
|
@ -437,9 +438,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
int operationTimeout;
|
||||
ExecutorService pool;
|
||||
Cache<TableName, Boolean> disabledAndDroppedTables;
|
||||
TableDescriptors tableDescriptors;
|
||||
|
||||
public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
|
||||
ExecutorService pool, int operationTimeout) {
|
||||
ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
|
||||
this.sink = sink;
|
||||
this.connection = connection;
|
||||
this.operationTimeout = operationTimeout;
|
||||
|
@ -447,6 +449,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
= RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
|
||||
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
|
||||
this.pool = pool;
|
||||
this.tableDescriptors = tableDescriptors;
|
||||
|
||||
int nonExistentTableCacheExpiryMs = connection.getConfiguration()
|
||||
.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
|
||||
|
@ -555,13 +558,14 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
}
|
||||
|
||||
boolean tasksCancelled = false;
|
||||
for (Future<ReplicateWALEntryResponse> task : tasks) {
|
||||
for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
|
||||
try {
|
||||
task.get();
|
||||
tasks.get(replicaId).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException(e.getMessage());
|
||||
} catch (ExecutionException e) {
|
||||
Throwable cause = e.getCause();
|
||||
boolean canBeSkipped = false;
|
||||
if (cause instanceof IOException) {
|
||||
// The table can be disabled or dropped at this time. For disabled tables, we have no
|
||||
// cheap mechanism to detect this case because meta does not contain this information.
|
||||
|
@ -570,14 +574,26 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
// check whether the table is dropped or disabled which might cause
|
||||
// SocketTimeoutException, or RetriesExhaustedException or similar if we get IOE.
|
||||
if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) {
|
||||
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
|
||||
canBeSkipped = true;
|
||||
} else if (tableDescriptors != null) {
|
||||
HTableDescriptor tableDescriptor = tableDescriptors.get(tableName);
|
||||
if (tableDescriptor != null
|
||||
// (replicaId + 1) as no task is added for primary replica for replication
|
||||
&& tableDescriptor.getRegionReplication() <= (replicaId + 1)) {
|
||||
canBeSkipped = true;
|
||||
}
|
||||
}
|
||||
if (canBeSkipped) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
|
||||
+ " because received exception for dropped or disabled table", cause);
|
||||
+ " because received exception for dropped or disabled table",
|
||||
cause);
|
||||
for (Entry entry : entries) {
|
||||
LOG.trace("Skipping : " + entry);
|
||||
}
|
||||
}
|
||||
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
|
||||
|
||||
if (!tasksCancelled) {
|
||||
sink.getSkippedEditsCounter().addAndGet(entries.size());
|
||||
tasksCancelled = true; // so that we do not add to skipped counter again
|
||||
|
|
|
@ -18,41 +18,49 @@
|
|||
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -60,8 +68,6 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
|
||||
* async wal replication replays the edits to the secondary region in various scenarios.
|
||||
|
@ -229,7 +235,7 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
for (int i = 1; i < regionReplication; i++) {
|
||||
final Region region = regions[i];
|
||||
// wait until all the data is replicated to all secondary regions
|
||||
Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate<Exception>() {
|
||||
Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
LOG.info("verifying replication for region replica:" + region.getRegionInfo());
|
||||
|
@ -307,7 +313,6 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
|
||||
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
|
||||
Table table = connection.getTable(tableName);
|
||||
|
||||
try {
|
||||
// load the data to the table
|
||||
|
||||
|
@ -327,29 +332,35 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 240000)
|
||||
@Test(timeout = 240000)
|
||||
public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
|
||||
testRegionReplicaReplicationIgnoresDisabledTables(false);
|
||||
testRegionReplicaReplicationIgnores(false, false);
|
||||
}
|
||||
|
||||
@Test (timeout = 240000)
|
||||
@Test(timeout = 240000)
|
||||
public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
|
||||
testRegionReplicaReplicationIgnoresDisabledTables(true);
|
||||
testRegionReplicaReplicationIgnores(true, false);
|
||||
}
|
||||
|
||||
public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable)
|
||||
@Test(timeout = 240000)
|
||||
public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
|
||||
testRegionReplicaReplicationIgnores(false, true);
|
||||
}
|
||||
|
||||
public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
|
||||
throws Exception {
|
||||
// tests having edits from a disabled or dropped table is handled correctly by skipping those
|
||||
// entries and further edits after the edits from dropped/disabled table can be replicated
|
||||
// without problems.
|
||||
TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables"
|
||||
+ dropTable);
|
||||
+ "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
|
||||
HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
|
||||
int regionReplication = 3;
|
||||
htd.setRegionReplication(regionReplication);
|
||||
HTU.deleteTableIfAny(tableName);
|
||||
HTU.getHBaseAdmin().createTable(htd);
|
||||
TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable");
|
||||
TableName toBeDisabledTable = TableName.valueOf(
|
||||
dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
|
||||
HTU.deleteTableIfAny(toBeDisabledTable);
|
||||
htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
|
||||
htd.setRegionReplication(regionReplication);
|
||||
|
@ -371,14 +382,16 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
|
||||
mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
|
||||
when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
|
||||
FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(),
|
||||
FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
|
||||
RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
|
||||
new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
|
||||
(ClusterConnection) connection,
|
||||
Executors.newSingleThreadExecutor(), Integer.MAX_VALUE);
|
||||
Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, fstd);
|
||||
|
||||
RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
|
||||
HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
|
||||
byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
|
||||
|
||||
Entry entry = new Entry(
|
||||
new WALKey(encodedRegionName, toBeDisabledTable, 1),
|
||||
new WALEdit());
|
||||
|
@ -386,13 +399,23 @@ public class TestRegionReplicaReplicationEndpoint {
|
|||
HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table
|
||||
if (dropTable) {
|
||||
HTU.getHBaseAdmin().deleteTable(toBeDisabledTable);
|
||||
} else if (disableReplication) {
|
||||
htd.setRegionReplication(regionReplication - 2);
|
||||
HTU.getHBaseAdmin().modifyTable(toBeDisabledTable, htd);
|
||||
HTU.getHBaseAdmin().enableTable(toBeDisabledTable);
|
||||
}
|
||||
|
||||
sinkWriter.append(toBeDisabledTable, encodedRegionName,
|
||||
HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
|
||||
|
||||
assertEquals(2, skippedEdits.get());
|
||||
|
||||
if (disableReplication) {
|
||||
// enable replication again so that we can verify replication
|
||||
HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table
|
||||
htd.setRegionReplication(regionReplication);
|
||||
HTU.getHBaseAdmin().modifyTable(toBeDisabledTable, htd);
|
||||
HTU.getHBaseAdmin().enableTable(toBeDisabledTable);
|
||||
}
|
||||
try {
|
||||
// load some data to the to-be-dropped table
|
||||
|
||||
|
|
Loading…
Reference in New Issue