diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java index ff414be9777..cb006756f8e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Collection; @@ -49,6 +50,14 @@ extends RetriesExhaustedException { List actions; List hostnameAndPort; + public RetriesExhaustedWithDetailsException(final String msg) { + super(msg); + } + + public RetriesExhaustedWithDetailsException(final String msg, final IOException e) { + super(msg, e); + } + public RetriesExhaustedWithDetailsException(List exceptions, List actions, List hostnameAndPort) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index f8195f1acd8..d86fc622003 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -253,6 +253,21 @@ public final class ProtobufUtil { return makeIOExceptionOfException(se); } + /** + * Return the Exception thrown by the remote server wrapped in + * ServiceException as cause. RemoteException are left untouched. + * + * @param se ServiceException that wraps IO exception thrown by the server + * @return Exception wrapped in ServiceException. + */ + public static IOException getServiceException(org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException e) { + Throwable t = e.getCause(); + if (ExceptionUtil.isInterrupt(t)) { + return ExceptionUtil.asInterrupt(t); + } + return t instanceof IOException ? (IOException) t : new HBaseIOException(t); + } + /** * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than * just {@link ServiceException}. Prefer this method to diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index cc9fc57c056..834e5bbea2c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1200,6 +1200,11 @@ public final class HConstants { public static final String REPLICATION_SOURCE_MAXTHREADS_KEY = "hbase.replication.source.maxthreads"; + /** Drop edits for tables that been deleted from the replication source and target */ + public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = + "hbase.replication.drop.on.deleted.table"; + + /** Maximum number of threads used by the replication source for shipping edits to the sinks */ public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10; /** Configuration key for SplitLog manager timeout */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index dbf7b5e3863..af9690a6594 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -69,7 +69,7 @@ public class ReplicationProtbufUtil { try { admin.replicateWALEntry(controller, p.getFirst()); } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException e) { - throw ProtobufUtil.handleRemoteException(e); + throw ProtobufUtil.getServiceException(e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index ffdba347130..401da4c2cf3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -53,6 +53,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) class Context { + private final Configuration localConf; private final Configuration conf; private final FileSystem fs; private final TableDescriptors tableDescriptors; @@ -64,6 +65,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { @InterfaceAudience.Private public Context( + final Configuration localConf, final Configuration conf, final FileSystem fs, final String peerId, @@ -72,6 +74,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { final MetricsSource metrics, final TableDescriptors tableDescriptors, final Abortable abortable) { + this.localConf = localConf; this.conf = conf; this.fs = fs; this.clusterId = clusterId; @@ -84,6 +87,9 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { public Configuration getConfiguration() { return conf; } + public Configuration getLocalConfiguration() { + return localConf; + } public FileSystem getFilesystem() { return fs; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index a210aaeb87a..c1ed64413db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -36,6 +36,8 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -45,9 +47,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; @@ -79,6 +83,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; private ClusterConnection conn; + private Configuration localConf; private Configuration conf; // How long should we sleep for each retry private long sleepForRetries; @@ -102,11 +107,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; private Abortable abortable; + private boolean dropOnDeletedTables; @Override public void init(Context context) throws IOException { super.init(context); this.conf = HBaseConfiguration.create(ctx.getConfiguration()); + this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); decorateConf(); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", @@ -139,6 +146,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // conservative for now. this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE)); + this.dropOnDeletedTables = + this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); this.replicationBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, @@ -225,6 +234,37 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return entryLists; } + private TableName parseTable(String msg) { + // ... TableNotFoundException: ''/n... + Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'"); + Matcher m = p.matcher(msg); + if (m.find()) { + String table = m.group(1); + try { + // double check that table is a valid table name + TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table))); + return TableName.valueOf(table); + } catch (IllegalArgumentException ignore) { + } + } + return null; + } + + // Filter a set of batches by TableName + private List> filterBatches(final List> oldEntryList, TableName table) { + List> entryLists = new ArrayList<>(); + for (List entries : oldEntryList) { + ArrayList thisList = new ArrayList(entries.size()); + entryLists.add(thisList); + for (Entry e : entries) { + if (!e.getKey().getTablename().equals(table)) { + thisList.add(e); + } + } + } + return entryLists; + } + private void reconnectToPeerCluster() { ClusterConnection connection = null; try { @@ -325,10 +365,27 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); if (ioe instanceof TableNotFoundException) { - if (sleepForRetries("A table is missing in the peer cluster. " - + "Replication cannot proceed without losing data.", sleepMultiplier)) { - sleepMultiplier++; + if (dropOnDeletedTables) { + // this is a bit fragile, but cannot change how TNFE is serialized + // at least check whether the table name is legal + TableName table = parseTable(ioe.getMessage()); + if (table != null) { + try (Connection localConn = + ConnectionFactory.createConnection(ctx.getLocalConfiguration())) { + if (!localConn.getAdmin().tableExists(table)) { + // Would potentially be better to retry in one of the outer loops + // and add a table filter there; but that would break the encapsulation, + // so we're doing the filtering here. + LOG.info("Missing table detected at sink, local table also does not exist, filtering edits for '"+table+"'"); + batches = filterBatches(batches, table); + continue; + } + } catch (IOException iox) { + LOG.warn("Exception checking for local table: ", iox); + } + } } + // fall through and sleep below } else { LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe); replicationSinkMgr.chooseSinks(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 05b1f218b1a..2f9f9c5c9eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -42,12 +42,14 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; @@ -372,6 +374,12 @@ public class ReplicationSink { for (List rows : allRows) { table.batch(rows, null); } + } catch (RetriesExhaustedWithDetailsException rewde) { + for (Throwable ex : rewde.getCauses()) { + if (ex instanceof TableNotFoundException) { + throw new TableNotFoundException("'"+tableName+"'"); + } + } } catch (InterruptedException ix) { throw (InterruptedIOException) new InterruptedIOException().initCause(ix); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 609274f3262..45d7d94c238 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -507,7 +507,7 @@ public class ReplicationSourceManager implements ReplicationListener { replicationEndpoint, walFileLengthProvider, metrics); // init replication endpoint - replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), + replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); return src; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java new file mode 100644 index 00000000000..df9cff25ece --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java @@ -0,0 +1,292 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; + +@Category(LargeTests.class) +public class TestReplicationDroppedTables extends TestReplicationBase { + private static final Log LOG = LogFactory.getLog(TestReplicationDroppedTables.class); + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Starting and stopping replication can make us miss new logs, + // rolling like this makes sure the most recent one gets added to the queue + for ( JVMClusterUtil.RegionServerThread r : + utility1.getHBaseCluster().getRegionServerThreads()) { + utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); + } + int rowCount = utility1.countRows(tableName); + utility1.deleteTableData(tableName); + // truncating the table will send one Delete per row to the slave cluster + // in an async fashion, which is why we cannot just call deleteTableData on + // utility2 since late writes could make it to the slave in some way. + // Instead, we truncate the first table and wait for all the Deletes to + // make it to the slave. + Scan scan = new Scan(); + int lastCount = 0; + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for truncate"); + } + ResultScanner scanner = htable2.getScanner(scan); + Result[] res = scanner.next(rowCount); + scanner.close(); + if (res.length != 0) { + if (res.length < lastCount) { + i--; // Don't increment timeout if we make progress + } + lastCount = res.length; + LOG.info("Still got " + res.length + " rows"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + @Test(timeout = 600000) + public void testEditsStuckBehindDroppedTable() throws Exception { + // Sanity check + // Make sure by default edits for dropped tables stall the replication queue, even when the + // table(s) in question have been deleted on both ends. + testEditsBehindDroppedTable(false, "test_dropped"); + } + + @Test(timeout = 600000) + public void testEditsDroppedWithDroppedTable() throws Exception { + // Make sure by default edits for dropped tables are themselves dropped when the + // table(s) in question have been deleted on both ends. + testEditsBehindDroppedTable(true, "test_dropped"); + } + + @Test(timeout = 600000) + public void testEditsDroppedWithDroppedTableNS() throws Exception { + // also try with a namespace + Connection connection1 = ConnectionFactory.createConnection(conf1); + try (Admin admin1 = connection1.getAdmin()) { + admin1.createNamespace(NamespaceDescriptor.create("NS").build()); + } + Connection connection2 = ConnectionFactory.createConnection(conf2); + try (Admin admin2 = connection2.getAdmin()) { + admin2.createNamespace(NamespaceDescriptor.create("NS").build()); + } + testEditsBehindDroppedTable(true, "NS:test_dropped"); + } + + private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception { + conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); + conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); + + // make sure we have a single region server only, so that all + // edits for all tables go there + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(1, 1); + + TableName tablename = TableName.valueOf(tName); + byte[] familyname = Bytes.toBytes("fam"); + byte[] row = Bytes.toBytes("row"); + + HTableDescriptor table = new HTableDescriptor(tablename); + HColumnDescriptor fam = new HColumnDescriptor(familyname); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + + Connection connection1 = ConnectionFactory.createConnection(conf1); + Connection connection2 = ConnectionFactory.createConnection(conf2); + try (Admin admin1 = connection1.getAdmin()) { + admin1.createTable(table); + } + try (Admin admin2 = connection2.getAdmin()) { + admin2.createTable(table); + } + utility1.waitUntilAllRegionsAssigned(tablename); + utility2.waitUntilAllRegionsAssigned(tablename); + + Table lHtable1 = utility1.getConnection().getTable(tablename); + + // now suspend replication + admin.disablePeer("2"); + + // put some data (lead with 0 so the edit gets sorted before the other table's edits + // in the replication batch) + // write a bunch of edits, making sure we fill a batch + byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped"); + Put put = new Put(rowkey); + put.addColumn(familyname, row, row); + lHtable1.put(put); + + rowkey = Bytes.toBytes("normal put"); + put = new Put(rowkey); + put.addColumn(famName, row, row); + htable1.put(put); + + try (Admin admin1 = connection1.getAdmin()) { + admin1.disableTable(tablename); + admin1.deleteTable(tablename); + } + try (Admin admin2 = connection2.getAdmin()) { + admin2.disableTable(tablename); + admin2.deleteTable(tablename); + } + + admin.enablePeer("2"); + if (allowProceeding) { + // in this we'd expect the key to make it over + verifyReplicationProceeded(rowkey); + } else { + verifyReplicationStuck(rowkey); + } + // just to be safe + conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); + } + + @Test(timeout = 600000) + public void testEditsBehindDroppedTableTiming() throws Exception { + conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true); + conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); + + // make sure we have a single region server only, so that all + // edits for all tables go there + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(1, 1); + + TableName tablename = TableName.valueOf("testdroppedtimed"); + byte[] familyname = Bytes.toBytes("fam"); + byte[] row = Bytes.toBytes("row"); + + HTableDescriptor table = new HTableDescriptor(tablename); + HColumnDescriptor fam = new HColumnDescriptor(familyname); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + + Connection connection1 = ConnectionFactory.createConnection(conf1); + Connection connection2 = ConnectionFactory.createConnection(conf2); + try (Admin admin1 = connection1.getAdmin()) { + admin1.createTable(table); + } + try (Admin admin2 = connection2.getAdmin()) { + admin2.createTable(table); + } + utility1.waitUntilAllRegionsAssigned(tablename); + utility2.waitUntilAllRegionsAssigned(tablename); + + Table lHtable1 = utility1.getConnection().getTable(tablename); + + // now suspend replication + admin.disablePeer("2"); + + // put some data (lead with 0 so the edit gets sorted before the other table's edits + // in the replication batch) + // write a bunch of edits, making sure we fill a batch + byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped"); + Put put = new Put(rowkey); + put.addColumn(familyname, row, row); + lHtable1.put(put); + + rowkey = Bytes.toBytes("normal put"); + put = new Put(rowkey); + put.addColumn(famName, row, row); + htable1.put(put); + + try (Admin admin2 = connection2.getAdmin()) { + admin2.disableTable(tablename); + admin2.deleteTable(tablename); + } + + admin.enablePeer("2"); + // edit should still be stuck + + try (Admin admin1 = connection1.getAdmin()) { + // the source table still exists, replication should be stalled + verifyReplicationStuck(rowkey); + + admin1.disableTable(tablename); + // still stuck, source table still exists + verifyReplicationStuck(rowkey); + + admin1.deleteTable(tablename); + // now the source table is gone, replication should proceed, the + // offending edits be dropped + verifyReplicationProceeded(rowkey); + } + // just to be safe + conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); + } + + private void verifyReplicationProceeded(byte[] rowkey) throws Exception { + Get get = new Get(rowkey); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.getRow(), rowkey); + break; + } + } + } + + private void verifyReplicationStuck(byte[] rowkey) throws Exception { + Get get = new Get(rowkey); + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() >= 1) { + fail("Edit should have been stuck behind dropped tables"); + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + } +}