diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index a0adc04a7fb..90f6a90d9f0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -18,7 +18,35 @@ */ package org.apache.hadoop.hbase.client; -import com.google.protobuf.ServiceException; +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -65,34 +93,7 @@ import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; -import java.io.Closeable; -import java.io.IOException; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.protobuf.ServiceException; /** * A non-instantiable class that manages {@link HConnection}s. @@ -932,7 +933,7 @@ public class HConnectionManager { throws IOException { return locateRegions (tableName, false, true); } - + @Override public List locateRegions(final byte[] tableName, final boolean useCache, final boolean offlined) throws IOException { @@ -2087,7 +2088,9 @@ public class HConnectionManager { for (List> actions : currentTask.getFirst().actions.values()) { for (Action action : actions) { Row row = action.getAction(); - hci.updateCachedLocations(tableName, row, exception, currentTask.getSecond()); + // Do not use the exception for updating cache because it might be coming from + // any of the regions in the MultiAction. + hci.updateCachedLocations(tableName, row, null, currentTask.getSecond()); if (noRetry) { errors.add(exception, row, currentTask); } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 302489f9077..23b367694fb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -17,15 +17,22 @@ */ package org.apache.hadoop.hbase.protobuf; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; +import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; @@ -107,22 +114,15 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableSet; - -import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; /** * Protobufs utility. @@ -1021,37 +1021,43 @@ public final class ProtobufUtil { */ public static MultiResponse multi(final ClientProtocol client, final MultiAction multi) throws IOException { - try { - MultiResponse response = new MultiResponse(); - for (Map.Entry>> e: multi.actions.entrySet()) { - byte[] regionName = e.getKey(); - int rowMutations = 0; - List> actions = e.getValue(); - for (Action action: actions) { - Row row = action.getAction(); - if (row instanceof RowMutations) { + MultiResponse response = new MultiResponse(); + for (Map.Entry>> e: multi.actions.entrySet()) { + byte[] regionName = e.getKey(); + int rowMutations = 0; + List> actions = e.getValue(); + for (Action action: actions) { + Row row = action.getAction(); + if (row instanceof RowMutations) { + try { MultiRequest request = - RequestConverter.buildMultiRequest(regionName, (RowMutations)row); + RequestConverter.buildMultiRequest(regionName, (RowMutations)row); client.multi(null, request); response.add(regionName, action.getOriginalIndex(), new Result()); - rowMutations++; - } - } - if (actions.size() > rowMutations) { - MultiRequest request = - RequestConverter.buildMultiRequest(regionName, actions); - ClientProtos.MultiResponse proto = client.multi(null, request); - List results = ResponseConverter.getResults(proto); - for (int i = 0, n = results.size(); i < n; i++) { - int originalIndex = actions.get(i).getOriginalIndex(); - response.add(regionName, originalIndex, results.get(i)); + } catch (ServiceException se) { + response.add(regionName, action.getOriginalIndex(), getRemoteException(se)); } + rowMutations++; + } + } + if (actions.size() > rowMutations) { + Exception ex = null; + List results = null; + try { + MultiRequest request = + RequestConverter.buildMultiRequest(regionName, actions); + ClientProtos.MultiResponse proto = client.multi(null, request); + results = ResponseConverter.getResults(proto); + } catch (ServiceException se) { + ex = getRemoteException(se); + } + for (int i = 0, n = actions.size(); i < n; i++) { + int originalIndex = actions.get(i).getOriginalIndex(); + response.add(regionName, originalIndex, results == null ? ex : results.get(i)); } } - return response; - } catch (ServiceException se) { - throw getRemoteException(se); } + return response; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 47b90491c0b..5355c98597d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -2441,7 +2442,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public void setFileSystemURI(String fsURI) { FS_URI = fsURI; } - + /** * Wrapper method for {@link Waiter#waitFor(Configuration, long, Predicate)}. */ @@ -2465,4 +2466,19 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { boolean failIfTimeout, Predicate predicate) throws E { return Waiter.waitFor(this.conf, timeout, interval, failIfTimeout, predicate); } + + /** + * Returns a {@link Predicate} for checking that there are no regions in transition in master + */ + public Waiter.Predicate predicateNoRegionsInTransition() { + return new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + final RegionStates regionStates = getMiniHBaseCluster().getMaster() + .getAssignmentManager().getRegionStates(); + return !regionStates.isRegionsInTransition(); + } + }; + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index e80f2080154..4f10c563327 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -18,10 +18,10 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -38,11 +38,17 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey; import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -52,6 +58,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.common.collect.Lists; + /** * This class is for testing HCM features */ @@ -62,8 +70,10 @@ public class TestHCM { private static final byte[] TABLE_NAME = Bytes.toBytes("test"); private static final byte[] TABLE_NAME1 = Bytes.toBytes("test1"); private static final byte[] TABLE_NAME2 = Bytes.toBytes("test2"); + private static final byte[] TABLE_NAME3 = Bytes.toBytes("test3"); private static final byte[] FAM_NAM = Bytes.toBytes("f"); private static final byte[] ROW = Bytes.toBytes("bbb"); + private static final byte[] ROW_X = Bytes.toBytes("xxx"); @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -106,14 +116,14 @@ public class TestHCM { private static int getHConnectionManagerCacheSize(){ return HConnectionTestingUtility.getConnectionCount(); } - + @Test public void abortingHConnectionRemovesItselfFromHCM() throws Exception { // Save off current HConnections - Map oldHBaseInstances = + Map oldHBaseInstances = new HashMap(); oldHBaseInstances.putAll(HConnectionManager.HBASE_INSTANCES); - + HConnectionManager.HBASE_INSTANCES.clear(); try { @@ -536,6 +546,120 @@ public class TestHCM { conn.close(); } + @Test + public void testMulti() throws Exception { + HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM); + TEST_UTIL.createMultiRegions(table, FAM_NAM); + HConnectionManager.HConnectionImplementation conn = + (HConnectionManager.HConnectionImplementation) + HConnectionManager.getConnection(TEST_UTIL.getConfiguration()); + + // We're now going to move the region and check that it works for the client + // First a new put to add the location in the cache + conn.clearRegionCache(TABLE_NAME3); + Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3)); + + TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + + // We can wait for all regions to be online, that makes log reading easier when debugging + while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Thread.sleep(1); + } + + Put put = new Put(ROW_X); + put.add(FAM_NAM, ROW_X, ROW_X); + table.put(put); + + // Now moving the region to the second server + HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X); + byte[] regionName = toMove.getRegionInfo().getRegionName(); + byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes(); + + // Choose the other server. + int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName); + int destServerId = (curServerId == 0 ? 1 : 0); + + HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId); + HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId); + + ServerName destServerName = destServer.getServerName(); + + //find another row in the cur server that is less than ROW_X + List regions = curServer.getOnlineRegions(TABLE_NAME3); + byte[] otherRow = null; + for (HRegion region : regions) { + if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName()) + && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) { + otherRow = region.getRegionInfo().getStartKey(); + break; + } + } + assertNotNull(otherRow); + Put put2 = new Put(otherRow); + put2.add(FAM_NAM, otherRow, otherRow); + table.put(put2); //cache put2's location + + // Check that we are in the expected state + Assert.assertTrue(curServer != destServer); + Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName()); + Assert.assertNotEquals(toMove.getPort(), destServerName.getPort()); + Assert.assertNotNull(curServer.getOnlineRegion(regionName)); + Assert.assertNull(destServer.getOnlineRegion(regionName)); + Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster(). + getAssignmentManager().getRegionStates().isRegionsInTransition()); + + // Moving. It's possible that we don't have all the regions online at this point, so + // the test must depends only on the region we're looking at. + LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString()); + TEST_UTIL.getHBaseAdmin().move( + toMove.getRegionInfo().getEncodedNameAsBytes(), + destServerName.getServerName().getBytes() + ); + + while (destServer.getOnlineRegion(regionName) == null || + destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || + curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) || + master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + // wait for the move to be finished + Thread.sleep(1); + } + + LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString()); + + // Check our new state. + Assert.assertNull(curServer.getOnlineRegion(regionName)); + Assert.assertNotNull(destServer.getOnlineRegion(regionName)); + Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); + Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes)); + + + // Cache was NOT updated and points to the wrong server + Assert.assertFalse( + conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort()); + + // Hijack the number of retry to fail after 2 tries + Field numRetries = conn.getClass().getDeclaredField("numRetries"); + numRetries.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL); + final int prevNumRetriesVal = (Integer)numRetries.get(conn); + numRetries.set(conn, 2); + + Put put3 = new Put(ROW_X); + put3.add(FAM_NAM, ROW_X, ROW_X); + Put put4 = new Put(otherRow); + put4.add(FAM_NAM, otherRow, otherRow); + + // do multi + table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row, + // second we get RegionMovedException. + + numRetries.set(conn, prevNumRetriesVal); + table.close(); + conn.close(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 093af5a179f..b7378eaade3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -35,10 +35,9 @@ import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.HBaseClient; import org.apache.hadoop.hbase.ipc.HBaseServer; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; @@ -85,16 +84,13 @@ public class TestMultiParallel { if (UTIL.ensureSomeRegionServersAvailable(slaves)) { // Distribute regions UTIL.getMiniHBaseCluster().getMaster().balance(); + // Wait until completing balance - final RegionStates regionStates = UTIL.getMiniHBaseCluster().getMaster() - .getAssignmentManager().getRegionStates(); - UTIL.waitFor(15 * 1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return !regionStates.isRegionsInTransition(); - } - }); + UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); } + HConnection conn = HConnectionManager.getConnection(UTIL.getConfiguration()); + conn.clearRegionCache(); + conn.close(); LOG.info("before done"); } @@ -143,7 +139,7 @@ public class TestMultiParallel { * @throws NoSuchFieldException * @throws SecurityException */ - @Test(timeout=300000) + @Test(timeout=300000) public void testActiveThreadsCount() throws Exception{ HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); List puts = constructPutRequests(); // creates a Put for every region @@ -155,7 +151,7 @@ public class TestMultiParallel { table.close(); } - @Test(timeout=300000) + @Test(timeout=300000) public void testBatchWithGet() throws Exception { LOG.info("test=testBatchWithGet"); HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); @@ -237,7 +233,7 @@ public class TestMultiParallel { * * @throws Exception */ - @Test (timeout=300000) + @Test (timeout=300000) public void testFlushCommitsWithAbort() throws Exception { LOG.info("test=testFlushCommitsWithAbort"); doTestFlushCommits(true); @@ -262,7 +258,7 @@ public class TestMultiParallel { } LOG.info("puts"); table.flushCommits(); - int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads() + final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads() .size(); assert liveRScount > 0; JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster() @@ -304,6 +300,18 @@ public class TestMultiParallel { int regions = ProtobufUtil.getOnlineRegions(t.getRegionServer()).size(); // Assert.assertTrue("Count of regions=" + regions, regions > 10); } + if (doAbort) { + UTIL.getMiniHBaseCluster().waitOnRegionServer(0); + UTIL.waitFor(15 * 1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return UTIL.getMiniHBaseCluster().getMaster() + .getClusterStatus().getServersSize() == (liveRScount - 1); + } + }); + UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition()); + } + table.close(); LOG.info("done"); } @@ -337,7 +345,7 @@ public class TestMultiParallel { table.close(); } - @Test(timeout=300000) + @Test(timeout=300000) public void testBatchWithDelete() throws Exception { LOG.info("test=testBatchWithDelete"); HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);