From eb906e20eeee76e3544ccd403f1d3a264c82a1e9 Mon Sep 17 00:00:00 2001 From: Ankit Singhal Date: Thu, 19 Jul 2018 14:58:59 -0700 Subject: [PATCH] HBASE-20908 Infinite loop on regionserver if region replica are reduced Signed-off-by: tedyu --- .../RegionReplicaReplicationEndpoint.java | 34 +++++++++--- .../TestRegionReplicaReplicationEndpoint.java | 53 +++++++++++++++---- 2 files changed, 68 insertions(+), 19 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index cb755feccf0..dc83eb7c132 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -31,7 +31,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; @@ -70,8 +69,10 @@ import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.cache.Cache; import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; @@ -276,7 +277,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, int numWriters, int operationTimeout) { super(controller, entryBuffers, numWriters); - this.sinkWriter = new RegionReplicaSinkWriter(this, connection, pool, operationTimeout); + this.sinkWriter = + new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors); this.tableDescriptors = tableDescriptors; // A cache for the table "memstore replication enabled" flag. @@ -390,9 +392,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { int operationTimeout; ExecutorService pool; Cache disabledAndDroppedTables; + TableDescriptors tableDescriptors; public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, - ExecutorService pool, int operationTimeout) { + ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) { this.sink = sink; this.connection = connection; this.operationTimeout = operationTimeout; @@ -400,6 +403,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { = RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); this.pool = pool; + this.tableDescriptors = tableDescriptors; int nonExistentTableCacheExpiryMs = connection.getConfiguration() .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); @@ -506,13 +510,14 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } boolean tasksCancelled = false; - for (Future task : tasks) { + for (int replicaId = 0; replicaId < tasks.size(); replicaId++) { try { - task.get(); + tasks.get(replicaId).get(); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } catch (ExecutionException e) { Throwable cause = e.getCause(); + boolean canBeSkipped = false; if (cause instanceof IOException) { // The table can be disabled or dropped at this time. For disabled tables, we have no // cheap mechanism to detect this case because meta does not contain this information. @@ -520,21 +525,34 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { // RPC. So instead we start the replay RPC with retries and check whether the table is // dropped or disabled which might cause SocketTimeoutException, or // RetriesExhaustedException or similar if we get IOE. - if (cause instanceof TableNotFoundException || connection.isTableDisabled(tableName)) { + if (cause instanceof TableNotFoundException + || connection.isTableDisabled(tableName)) { + disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. + canBeSkipped = true; + } else if (tableDescriptors != null) { + TableDescriptor tableDescriptor = tableDescriptors.get(tableName); + if (tableDescriptor != null + //(replicaId + 1) as no task is added for primary replica for replication + && tableDescriptor.getRegionReplication() <= (replicaId + 1)) { + canBeSkipped = true; + } + } + if (canBeSkipped) { if (LOG.isTraceEnabled()) { LOG.trace("Skipping " + entries.size() + " entries in table " + tableName - + " because received exception for dropped or disabled table", cause); + + " because received exception for dropped or disabled table", + cause); for (Entry entry : entries) { LOG.trace("Skipping : " + entry); } } - disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. if (!tasksCancelled) { sink.getSkippedEditsCounter().addAndGet(entries.size()); tasksCancelled = true; // so that we do not add to skipped counter again } continue; } + // otherwise rethrow throw (IOException)cause; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index 61a1fbfc569..04db81a6df8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -29,6 +29,11 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Cell.Type; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -50,6 +55,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -263,7 +270,7 @@ public class TestRegionReplicaReplicationEndpoint { for (int i = 1; i < regionReplication; i++) { final Region region = regions[i]; // wait until all the data is replicated to all secondary regions - Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate() { + Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { LOG.info("verifying replication for region replica:" + region.getRegionInfo()); @@ -342,7 +349,6 @@ public class TestRegionReplicaReplicationEndpoint { Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); Table table = connection.getTable(tableName); - try { // load the data to the table @@ -364,26 +370,35 @@ public class TestRegionReplicaReplicationEndpoint { @Test public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception { - testRegionReplicaReplicationIgnoresDisabledTables(false); + testRegionReplicaReplicationIgnores(false, false); } @Test public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception { - testRegionReplicaReplicationIgnoresDisabledTables(true); + testRegionReplicaReplicationIgnores(true, false); } - public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) + @Test + public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception { + testRegionReplicaReplicationIgnores(false, true); + } + + public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication) throws Exception { + // tests having edits from a disabled or dropped table is handled correctly by skipping those // entries and further edits after the edits from dropped/disabled table can be replicated // without problems. - final TableName tableName = TableName.valueOf(name.getMethodName() + dropTable); + final TableName tableName = TableName.valueOf( + name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication); HTableDescriptor htd = HTU.createTableDescriptor(tableName); int regionReplication = 3; htd.setRegionReplication(regionReplication); HTU.deleteTableIfAny(tableName); + HTU.getAdmin().createTable(htd); - TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); + TableName toBeDisabledTable = TableName.valueOf( + dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable")); HTU.deleteTableIfAny(toBeDisabledTable); htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); htd.setRegionReplication(regionReplication); @@ -405,28 +420,44 @@ public class TestRegionReplicaReplicationEndpoint { RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); + FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(), + FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath()); RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, - (ClusterConnection) connection, - Executors.newSingleThreadExecutor(), Integer.MAX_VALUE); + (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, + fstd); RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); + Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A")) + .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build(); Entry entry = new Entry( new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), - new WALEdit()); + new WALEdit() + .add(cell)); HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table if (dropTable) { HTU.getAdmin().deleteTable(toBeDisabledTable); + } else if (disableReplication) { + htd.setRegionReplication(regionReplication - 2); + HTU.getAdmin().modifyTable(toBeDisabledTable, htd); + HTU.getAdmin().enableTable(toBeDisabledTable); } - sinkWriter.append(toBeDisabledTable, encodedRegionName, HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); assertEquals(2, skippedEdits.get()); + if (disableReplication) { + // enable replication again so that we can verify replication + HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table + htd.setRegionReplication(regionReplication); + HTU.getAdmin().modifyTable(toBeDisabledTable, htd); + HTU.getAdmin().enableTable(toBeDisabledTable); + } + try { // load some data to the to-be-dropped table