From 59230cb1847923c98c7d08deb8b8b56830ab785a Mon Sep 17 00:00:00 2001 From: stack Date: Mon, 25 Aug 2014 18:16:55 -0700 Subject: [PATCH] HBASE-11610 Enhance remote meta updates (Virag Kothari) --- .../client/RpcRetryingCallerFactory.java | 7 +- .../hadoop/hbase/master/RegionStateStore.java | 40 ++++++------ .../TestAssignmentManagerOnCluster.java | 64 ++++++++++++++++++- 3 files changed, 87 insertions(+), 24 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 7957cc8f4fb..9b070a57b33 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -47,9 +47,12 @@ public class RpcRetryingCallerFactory { } public static RpcRetryingCallerFactory instantiate(Configuration configuration) { + String clazzName = RpcRetryingCallerFactory.class.getName(); String rpcCallerFactoryClazz = - configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, - RpcRetryingCallerFactory.class.getName()); + configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName); + if (rpcCallerFactoryClazz.equals(clazzName)) { + return new RpcRetryingCallerFactory(configuration); + } return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz, new Class[] { Configuration.class }, new Object[] { configuration }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java index 8e1e040742d..26a31ab0146 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,8 +33,6 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.master.RegionState.State; @@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; +import org.apache.hadoop.hbase.util.MultiHConnection; import com.google.common.base.Preconditions; @@ -56,7 +56,7 @@ public class RegionStateStore { protected static final char META_REPLICA_ID_DELIMITER = '_'; private volatile HRegion metaRegion; - private volatile HTableInterface metaTable; + private MultiHConnection multiHConnection; private volatile boolean initialized; private final boolean noPersistence; @@ -139,7 +139,6 @@ public class RegionStateStore { initialized = false; } - @SuppressWarnings("deprecation") void start() throws IOException { if (!noPersistence) { if (server instanceof RegionServerServices) { @@ -147,8 +146,13 @@ public class RegionStateStore { HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); } if (metaRegion == null) { - metaTable = new HTable(TableName.META_TABLE_NAME, - server.getShortCircuitConnection()); + Configuration conf = server.getConfiguration(); + // Config to determine the no of HConnections to META. + // A single HConnection should be sufficient in most cases. Only if + // you are doing lot of writes (>1M) to META, + // increasing this value might improve the write throughput. + multiHConnection = + new MultiHConnection(conf, conf.getInt("hbase.regionstatestore.meta.connection", 1)); } } initialized = true; @@ -156,18 +160,11 @@ public class RegionStateStore { void stop() { initialized = false; - if (metaTable != null) { - try { - metaTable.close(); - } catch (IOException e) { - LOG.info("Got exception in closing meta table", e); - } finally { - metaTable = null; - } + if (multiHConnection != null) { + multiHConnection.close(); } } - @SuppressWarnings("deprecation") void updateRegionState(long openSeqNum, RegionState newState, RegionState oldState) { if (noPersistence || !initialized) { @@ -219,22 +216,23 @@ public class RegionStateStore { synchronized (this) { if (metaRegion != null) { LOG.info("Meta region shortcut failed", t); - metaTable = new HTable(TableName.META_TABLE_NAME, - server.getShortCircuitConnection()); + if (multiHConnection == null) { + multiHConnection = new MultiHConnection(server.getConfiguration(), 1); + } metaRegion = null; } } } } - synchronized(metaTable) { - metaTable.put(put); - } + // Called when meta is not on master + multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, null); + } catch (IOException ioe) { LOG.error("Failed to persist region state " + newState, ioe); server.abort("Failed to update region location", ioe); } } - + void splitRegion(HRegionInfo p, HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException { MetaTableAccessor.splitRegion(server.getShortCircuitConnection(), p, a, b, sn); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java index ffc24e44823..0ee6037881d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -1079,7 +1081,67 @@ public class TestAssignmentManagerOnCluster { TEST_UTIL.deleteTable(Bytes.toBytes(table)); } } - + + /** + * Test concurrent updates to meta when meta is not on master + * @throws Exception + */ + @Test(timeout = 30000) + public void testUpdatesRemoteMeta() throws Exception { + // Not for zk less assignment + if (conf.getBoolean("hbase.assignment.usezk", true)) { + return; + } + conf.setInt("hbase.regionstatestore.meta.connection", 3); + final RegionStateStore rss = + new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager())); + rss.start(); + // Create 10 threads and make each do 10 puts related to region state update + Thread[] th = new Thread[10]; + List nameList = new ArrayList(); + List tableNameList = new ArrayList(); + for (int i = 0; i < th.length; i++) { + th[i] = new Thread() { + @Override + public void run() { + HRegionInfo[] hri = new HRegionInfo[10]; + ServerName serverName = ServerName.valueOf("dummyhost", 1000, 1234); + for (int i = 0; i < 10; i++) { + hri[i] = new HRegionInfo(TableName.valueOf(Thread.currentThread().getName() + "_" + i)); + RegionState newState = new RegionState(hri[i], RegionState.State.OPEN, serverName); + RegionState oldState = + new RegionState(hri[i], RegionState.State.PENDING_OPEN, serverName); + rss.updateRegionState(1, newState, oldState); + } + } + }; + th[i].start(); + nameList.add(th[i].getName()); + } + for (int i = 0; i < th.length; i++) { + th[i].join(); + } + // Add all the expected table names in meta to tableNameList + for (String name : nameList) { + for (int i = 0; i < 10; i++) { + tableNameList.add(TableName.valueOf(name + "_" + i)); + } + } + List metaRows = MetaTableAccessor.fullScanOfMeta(admin.getConnection()); + int count = 0; + // Check all 100 rows are in meta + for (Result result : metaRows) { + if (tableNameList.contains(HRegionInfo.getTable(result.getRow()))) { + count++; + if (count == 100) { + break; + } + } + } + assertTrue(count == 100); + rss.stop(); + } + static class MyLoadBalancer extends StochasticLoadBalancer { // For this region, if specified, always assign to nowhere static volatile String controledRegion = null;