diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 09980c0f82a..2072d6da3de 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -123,27 +123,30 @@ public class MetaTableAccessor { * The actual layout of meta should be encapsulated inside MetaTableAccessor methods, * and should not leak out of it (through Result objects, etc) * - * For replication serially, there are two column families "rep_barrier", "rep_position" whose - * row key is encodedRegionName. + * For replication serially, there are three column families "rep_barrier", "rep_position" and + * "rep_meta" whose row key is encodedRegionName. * rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence * id in this region * rep_position:{peerid} => to save the max sequence id we have pushed for each peer - * rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when + * rep_meta:_TABLENAME_ => a special cell to save this region's table name, will used when * we clean old data - * rep_position:_DAUGHTER_ => a special cell to present this region is split or merged, in this + * rep_meta:_DAUGHTER_ => a special cell to present this region is split or merged, in this * cell the value is merged encoded name or two split encoded names * separated by "," + * rep_meta:_PARENT_ => a special cell to present this region's parent region(s), in this + * cell the value is encoded name of one or two parent regions + * separated by "," */ private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class); private static final Log METALOG = LogFactory.getLog("org.apache.hadoop.hbase.META"); - // Save its daughter region(s) when split/merge - private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_"); + // Save its daughter/parent region(s) when split/merge + private static final byte[] daughterNameCq = Bytes.toBytes("_DAUGHTER_"); + private static final byte[] parentNameCq = Bytes.toBytes("_PARENT_"); // Save its table name because we only know region's encoded name - private static final String tableNamePeer = "_TABLENAME_"; - private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer); + private static final byte[] tableNameCq = Bytes.toBytes("_TABLENAME_"); static final byte [] META_REGION_PREFIX; static { @@ -1343,13 +1346,18 @@ public class MetaTableAccessor { byte[] seqBytes = Bytes.toBytes(seq); return new Put(encodedRegionName) .addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes) - .addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName); + .addImmutable(HConstants.REPLICATION_META_FAMILY, tableNameCq, tableName); } - public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) { - return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY, - daughterNamePosCq, value); + public static Put makeDaughterPut(byte[] encodedRegionName, byte[] value) { + return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_META_FAMILY, + daughterNameCq, value); + } + + public static Put makeParentPut(byte[] encodedRegionName, byte[] value) { + return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_META_FAMILY, + parentNameCq, value); } /** @@ -1374,7 +1382,7 @@ public class MetaTableAccessor { * @param puts Put to add to hbase:meta * @throws IOException */ - static void putToMetaTable(final Connection connection, final Put... puts) + public static void putToMetaTable(final Connection connection, final Put... puts) throws IOException { put(getMetaHTable(connection), Arrays.asList(puts)); } @@ -1674,10 +1682,10 @@ public class MetaTableAccessor { + HConstants.DELIMITER); Mutation[] mutations; if (saveBarrier) { - Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(), - Bytes.toBytes(mergedRegion.getEncodedName())); - Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(), - Bytes.toBytes(mergedRegion.getEncodedName())); + Put putBarrierA = makeDaughterPut(regionA.getEncodedNameAsBytes(), + mergedRegion.getEncodedNameAsBytes()); + Put putBarrierB = makeDaughterPut(regionB.getEncodedNameAsBytes(), + mergedRegion.getEncodedNameAsBytes()); mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB }; } else { mutations = new Mutation[] { putOfMerged, deleteA, deleteB }; @@ -1729,10 +1737,14 @@ public class MetaTableAccessor { Mutation[] mutations; if (saveBarrier) { - Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(), - Bytes - .toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName())); - mutations = new Mutation[]{putParent, putA, putB, putBarrier}; + Put parentPut = makeDaughterPut(parent.getEncodedNameAsBytes(), + Bytes.toBytes(splitA.getEncodedName() + "," + splitB.getEncodedName())); + Put daughterPutA = makeParentPut(splitA.getEncodedNameAsBytes(), + parent.getEncodedNameAsBytes()); + Put daughterPutB = makeParentPut(splitB.getEncodedNameAsBytes(), + parent.getEncodedNameAsBytes()); + + mutations = new Mutation[]{putParent, putA, putB, parentPut, daughterPutA, daughterPutB}; } else { mutations = new Mutation[]{putParent, putA, putB}; } @@ -2112,14 +2124,9 @@ public class MetaTableAccessor { Result r = get(getMetaHTable(connection), get); Map map = new HashMap<>((int) (r.size() / 0.75 + 1)); for (Cell c : r.listCells()) { - if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(), - c.getQualifierOffset(), c.getQualifierLength()) && - !Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(), - c.getQualifierOffset(), c.getQualifierLength())) { - map.put( - Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()), - Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength())); - } + map.put( + Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()), + Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength())); } return map; } @@ -2172,12 +2179,32 @@ public class MetaTableAccessor { /** * Get daughter region(s) for a region, only used in serial replication. + * @param connection connection we're using + * @param encodedName region's encoded name * @throws IOException */ public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName) throws IOException { Get get = new Get(encodedName); - get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq); + get.addColumn(HConstants.REPLICATION_META_FAMILY, daughterNameCq); + Result result = get(getMetaHTable(connection), get); + if (!result.isEmpty()) { + Cell c = result.rawCells()[0]; + return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength()); + } + return null; + } + + /** + * Get parent region(s) for a region, only used in serial replication. + * @param connection connection we're using + * @param encodedName region's encoded name + * @throws IOException + */ + public static String getSerialReplicationParentRegion(Connection connection, byte[] encodedName) + throws IOException { + Get get = new Get(encodedName); + get.addColumn(HConstants.REPLICATION_META_FAMILY, parentNameCq); Result result = get(getMetaHTable(connection), get); if (!result.isEmpty()) { Cell c = result.rawCells()[0]; @@ -2188,12 +2215,14 @@ public class MetaTableAccessor { /** * Get the table name for a region, only used in serial replication. + * @param connection connection we're using + * @param encodedName region's encoded name * @throws IOException */ public static String getSerialReplicationTableName(Connection connection, byte[] encodedName) throws IOException { Get get = new Get(encodedName); - get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq); + get.addColumn(HConstants.REPLICATION_META_FAMILY, tableNameCq); Result result = get(getMetaHTable(connection), get); if (!result.isEmpty()) { Cell c = result.rawCells()[0]; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 4f8faccb01f..3f9b430e868 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -436,13 +436,20 @@ public final class HConstants { public static final byte [] REPLICATION_BARRIER_FAMILY = Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR); - /** The replication barrier family as a string*/ + /** The replication position family as a string*/ public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position"; - /** The replication barrier family */ + /** The replication position family */ public static final byte [] REPLICATION_POSITION_FAMILY = Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR); + /** The replication meta family as a string*/ + public static final String REPLICATION_META_FAMILY_STR = "rep_meta"; + + /** The replication meta family */ + public static final byte [] REPLICATION_META_FAMILY = + Bytes.toBytes(REPLICATION_META_FAMILY_STR); + /** The RegionInfo qualifier as a string */ public static final String REGIONINFO_QUALIFIER_STR = "regioninfo"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java index e9647e8d06d..b133c56ca68 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java @@ -140,6 +140,7 @@ public class ReplicationMetaCleaner extends ScheduledChore { Delete delete = new Delete(encodedBytes); delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY); delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY); + delete.addFamily(HConstants.REPLICATION_META_FAMILY); try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) { metaTable.delete(delete); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index ace099ba837..89f7a054225 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.NonceGenerator; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.ConfigurationObserver; @@ -3052,7 +3053,25 @@ public class HRegionServer extends HasThread implements if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow; throw new IOException(exceptionToThrow); } - + if (parentRegion.getTableDesc().hasSerialReplicationScope()) { + // For serial replication, we need add a final barrier on this region. But the splitting may + // be reverted, so we should make sure if we reopen this region, the open barrier is same as + // this final barrier + long seq = parentRegion.getMaxFlushedSeqId(); + if (seq == HConstants.NO_SEQNUM) { + // No edits in WAL for this region; get the sequence number when the region was opened. + seq = parentRegion.getOpenSeqNum(); + if (seq == HConstants.NO_SEQNUM) { + // This region has no data + seq = 0; + } + } else { + seq++; + } + Put finalBarrier = MetaTableAccessor.makeBarrierPut(Bytes.toBytes(parentRegionEncodedName), + seq, parentRegion.getTableDesc().getTableName().getName()); + MetaTableAccessor.putToMetaTable(getConnection(), finalBarrier); + } // Offline the region this.removeFromOnlineRegions(parentRegion, null); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index bb403a438ad..388efbf9d30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -33,10 +32,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; @@ -58,9 +55,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -70,6 +64,9 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index b2f3b8da9e9..fa6f894fc4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -860,8 +860,8 @@ public class ReplicationSourceManager implements ReplicationListener { * It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the * order of logs that is written before altering. * 3) This entry is in the first interval of barriers. We can push them because it is the - * start of a region. Splitting/merging regions are also ok because the first section of - * daughter region is in same region of parents and the order in one RS is guaranteed. + * start of a region. But if the region is created by region split, we should check + * if the parent regions are fully pushed. * 4) If the entry's seq id and the position are in same section, or the pos is the last * number of previous section. Because when open a region we put a barrier the number * is the last log's id + 1. @@ -879,7 +879,40 @@ public class ReplicationSourceManager implements ReplicationListener { } if (interval == 1) { // Case 3 - return; + // Check if there are parent regions + String parentValue = MetaTableAccessor.getSerialReplicationParentRegion(connection, + encodedName); + if (parentValue == null) { + // This region has no parent or the parent's log entries are fully pushed. + return; + } + while (true) { + boolean allParentDone = true; + String[] parentRegions = parentValue.split(","); + for (String parent : parentRegions) { + byte[] region = Bytes.toBytes(parent); + long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, region, peerId); + List parentBarriers = MetaTableAccessor.getReplicationBarriers(connection, region); + if (parentBarriers.size() > 0 + && parentBarriers.get(parentBarriers.size() - 1) - 1 > pos) { + allParentDone = false; + // For a closed region, we will write a close event marker to WAL whose sequence id is + // larger than final barrier but still smaller than next region's openSeqNum. + // So if the pos is larger than last barrier, we can say we have read the event marker + // which means the parent region has been fully pushed. + LOG.info(Bytes.toString(encodedName) + " can not start pushing because parent region's" + + " log has not been fully pushed: parent=" + Bytes.toString(region) + " pos=" + pos + + " barriers=" + Arrays.toString(barriers.toArray())); + break; + } + } + if (allParentDone) { + return; + } else { + Thread.sleep(replicationWaitTime); + } + } + } while (true) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java index 81dadd94acd..a100a15f7b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java @@ -168,6 +168,18 @@ public class FSTableDescriptors implements TableDescriptors { // Enable cache of data blocks in L1 if more than one caching tier deployed: // e.g. if using CombinedBlockCache (BucketCache). .setCacheDataInL1(true), + new HColumnDescriptor(HConstants.REPLICATION_META_FAMILY) + .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS, + HConstants.DEFAULT_HBASE_META_VERSIONS)) + .setInMemory(true) + .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, + HConstants.DEFAULT_HBASE_META_BLOCK_SIZE)) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. + .setBloomFilterType(BloomType.NONE) + // Enable cache of data blocks in L1 if more than one caching tier deployed: + // e.g. if using CombinedBlockCache (BucketCache). + .setCacheDataInL1(true), new HColumnDescriptor(HConstants.TABLE_FAMILY) // Ten is arbitrary number. Keep versions to help debugging. .setMaxVersions(10) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java index 80c534aa8ee..2cb4eccaf7e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; @@ -112,7 +113,7 @@ public class TestSerialReplication { rpc.setClusterKey(utility2.getClusterKey()); admin1.addPeer("1", rpc, null); - utility1.startMiniCluster(1, 3); + utility1.startMiniCluster(1, 10); utility2.startMiniCluster(1, 1); utility1.getHBaseAdmin().setBalancerRunning(false, true); @@ -174,16 +175,10 @@ public class TestSerialReplication { return; } throw new Exception("Not all logs have been pushed"); - } finally { - utility1.getHBaseAdmin().disableTable(tableName); - utility2.getHBaseAdmin().disableTable(tableName); - utility1.getHBaseAdmin().deleteTable(tableName); - utility2.getHBaseAdmin().deleteTable(tableName); } } @Test - @Ignore public void testRegionSplit() throws Exception { TableName tableName = TableName.valueOf("testRegionSplit"); HTableDescriptor table = new HTableDescriptor(tableName); @@ -201,18 +196,12 @@ public class TestSerialReplication { t1.put(put); } utility1.getHBaseAdmin().split(tableName, ROWS[50]); - Thread.sleep(5000L); + waitTableHasRightNumberOfRegions(tableName, 2); for (int i = 11; i < 100; i += 10) { Put put = new Put(ROWS[i]); put.addColumn(famName, VALUE, VALUE); t1.put(put); } - balanceTwoRegions(t1); - for (int i = 12; i < 100; i += 10) { - Put put = new Put(ROWS[i]); - put.addColumn(famName, VALUE, VALUE); - t1.put(put); - } long start = EnvironmentEdgeManager.currentTime(); while (EnvironmentEdgeManager.currentTime() - start < 180000) { @@ -226,54 +215,40 @@ public class TestSerialReplication { } } List listOfNumbers = getRowNumbers(list); - List list0 = new ArrayList<>(); List list1 = new ArrayList<>(); List list21 = new ArrayList<>(); List list22 = new ArrayList<>(); for (int num : listOfNumbers) { if (num % 10 == 0) { - list0.add(num); - } else if (num % 10 == 1) { list1.add(num); - } else if (num < 50) { //num%10==2 + }else if (num < 50) { //num%10==1 list21.add(num); } else { // num%10==1&&num>50 list22.add(num); } } - LOG.info(Arrays.toString(list0.toArray())); LOG.info(Arrays.toString(list1.toArray())); LOG.info(Arrays.toString(list21.toArray())); LOG.info(Arrays.toString(list22.toArray())); - assertIntegerList(list0, 10, 10); - assertIntegerList(list1, 11, 10); - assertIntegerList(list21, 12, 10); - assertIntegerList(list22, 52, 10); - if (!list1.isEmpty()) { - assertEquals(9, list0.size()); - } + assertIntegerList(list1, 10, 10); + assertIntegerList(list21, 11, 10); + assertIntegerList(list22, 51, 10); if (!list21.isEmpty() || !list22.isEmpty()) { assertEquals(9, list1.size()); } - if (list.size() == 27) { + if (list.size() == 18) { return; } LOG.info("Waiting all logs pushed to slave. Expected 27 , actual " + list.size()); Thread.sleep(200); } throw new Exception("Not all logs have been pushed"); - } finally { - utility1.getHBaseAdmin().disableTable(tableName); - utility2.getHBaseAdmin().disableTable(tableName); - utility1.getHBaseAdmin().deleteTable(tableName); - utility2.getHBaseAdmin().deleteTable(tableName); } } @Test - @Ignore public void testRegionMerge() throws Exception { TableName tableName = TableName.valueOf("testRegionMerge"); HTableDescriptor table = new HTableDescriptor(tableName); @@ -282,8 +257,9 @@ public class TestSerialReplication { table.addFamily(fam); utility1.getHBaseAdmin().createTable(table); utility2.getHBaseAdmin().createTable(table); + Threads.sleep(5000); utility1.getHBaseAdmin().split(tableName, ROWS[50]); - Thread.sleep(5000L); + waitTableHasRightNumberOfRegions(tableName, 2); try(Table t1 = utility1.getConnection().getTable(tableName); Table t2 = utility2.getConnection().getTable(tableName)) { @@ -294,9 +270,9 @@ public class TestSerialReplication { } List> regions = MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), tableName); - assertEquals(2, regions.size()); utility1.getHBaseAdmin().mergeRegions(regions.get(0).getFirst().getRegionName(), regions.get(1).getFirst().getRegionName(), true); + waitTableHasRightNumberOfRegions(tableName, 1); for (int i = 11; i < 100; i += 10) { Put put = new Put(ROWS[i]); put.addColumn(famName, VALUE, VALUE); @@ -326,7 +302,6 @@ public class TestSerialReplication { } LOG.info(Arrays.toString(list0.toArray())); LOG.info(Arrays.toString(list1.toArray())); - assertIntegerList(list0, 10, 10); assertIntegerList(list1, 11, 10); if (!list1.isEmpty()) { assertEquals(9, list0.size()); @@ -338,11 +313,6 @@ public class TestSerialReplication { Thread.sleep(200); } - } finally { - utility1.getHBaseAdmin().disableTable(tableName); - utility2.getHBaseAdmin().disableTable(tableName); - utility1.getHBaseAdmin().deleteTable(tableName); - utility2.getHBaseAdmin().deleteTable(tableName); } } @@ -370,10 +340,13 @@ public class TestSerialReplication { ServerName name = utility1.getHBaseCluster().getRegionServer(index).getServerName(); utility1.getAdmin() .move(regionInfo.getEncodedNameAsBytes(), Bytes.toBytes(name.getServerName())); - try { - Thread.sleep(5000L); // wait to complete - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + while (true) { + regions = + MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName()); + if (regions.get(0).getSecond().equals(name)) { + break; + } + Threads.sleep(100); } } @@ -387,16 +360,34 @@ public class TestSerialReplication { ServerName name2 = utility1.getHBaseCluster().getRegionServer(1).getServerName(); utility1.getAdmin() .move(regionInfo1.getEncodedNameAsBytes(), Bytes.toBytes(name1.getServerName())); - Thread.sleep(5000L); utility1.getAdmin() .move(regionInfo2.getEncodedNameAsBytes(), Bytes.toBytes(name2.getServerName())); - Thread.sleep(5000L); + while (true) { + regions = + MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName()); + if (regions.get(0).getSecond().equals(name1) && regions.get(1).getSecond().equals(name2)) { + break; + } + Threads.sleep(100); + } + } + + private void waitTableHasRightNumberOfRegions(TableName tableName, int num) throws IOException { + while (true) { + List> regions = + MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), tableName); + if (regions.size() == num) { + return; + } + Threads.sleep(100); + } + } private void assertIntegerList(List list, int start, int step) { int size = list.size(); for (int i = 0; i < size; i++) { - assertTrue(list.get(i) == start + step * i); + assertEquals(start + step * i, list.get(i).intValue()); } } }