HBASE-8036 ProtobufUtil.multi behavior is inconsistent in case of errors

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1456063 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2013-03-13 17:51:02 +00:00
parent 64cdeb630c
commit 8c68535d1a
5 changed files with 258 additions and 101 deletions

View File

@ -18,7 +18,35 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -65,34 +93,7 @@ import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.protobuf.ServiceException;
/**
* A non-instantiable class that manages {@link HConnection}s.
@ -932,7 +933,7 @@ public class HConnectionManager {
throws IOException {
return locateRegions (tableName, false, true);
}
@Override
public List<HRegionLocation> locateRegions(final byte[] tableName, final boolean useCache,
final boolean offlined) throws IOException {
@ -2087,7 +2088,9 @@ public class HConnectionManager {
for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
for (Action<R> action : actions) {
Row row = action.getAction();
hci.updateCachedLocations(tableName, row, exception, currentTask.getSecond());
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
hci.updateCachedLocations(tableName, row, null, currentTask.getSecond());
if (noRetry) {
errors.add(exception, row, currentTask);
} else {

View File

@ -17,15 +17,22 @@
*/
package org.apache.hadoop.hbase.protobuf;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
@ -107,22 +114,15 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
* Protobufs utility.
@ -1021,37 +1021,43 @@ public final class ProtobufUtil {
*/
public static <R> MultiResponse multi(final ClientProtocol client,
final MultiAction<R> multi) throws IOException {
try {
MultiResponse response = new MultiResponse();
for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
byte[] regionName = e.getKey();
int rowMutations = 0;
List<Action<R>> actions = e.getValue();
for (Action<R> action: actions) {
Row row = action.getAction();
if (row instanceof RowMutations) {
MultiResponse response = new MultiResponse();
for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
byte[] regionName = e.getKey();
int rowMutations = 0;
List<Action<R>> actions = e.getValue();
for (Action<R> action: actions) {
Row row = action.getAction();
if (row instanceof RowMutations) {
try {
MultiRequest request =
RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
client.multi(null, request);
response.add(regionName, action.getOriginalIndex(), new Result());
rowMutations++;
}
}
if (actions.size() > rowMutations) {
MultiRequest request =
RequestConverter.buildMultiRequest(regionName, actions);
ClientProtos.MultiResponse proto = client.multi(null, request);
List<Object> results = ResponseConverter.getResults(proto);
for (int i = 0, n = results.size(); i < n; i++) {
int originalIndex = actions.get(i).getOriginalIndex();
response.add(regionName, originalIndex, results.get(i));
} catch (ServiceException se) {
response.add(regionName, action.getOriginalIndex(), getRemoteException(se));
}
rowMutations++;
}
}
if (actions.size() > rowMutations) {
Exception ex = null;
List<Object> results = null;
try {
MultiRequest request =
RequestConverter.buildMultiRequest(regionName, actions);
ClientProtos.MultiResponse proto = client.multi(null, request);
results = ResponseConverter.getResults(proto);
} catch (ServiceException se) {
ex = getRemoteException(se);
}
for (int i = 0, n = actions.size(); i < n; i++) {
int originalIndex = actions.get(i).getOriginalIndex();
response.add(regionName, originalIndex, results == null ? ex : results.get(i));
}
}
return response;
} catch (ServiceException se) {
throw getRemoteException(se);
}
return response;
}
/**

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -2441,7 +2442,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public void setFileSystemURI(String fsURI) {
FS_URI = fsURI;
}
/**
* Wrapper method for {@link Waiter#waitFor(Configuration, long, Predicate)}.
*/
@ -2465,4 +2466,19 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
boolean failIfTimeout, Predicate<E> predicate) throws E {
return Waiter.waitFor(this.conf, timeout, interval, failIfTimeout, predicate);
}
/**
* Returns a {@link Predicate} for checking that there are no regions in transition in master
*/
public Waiter.Predicate<Exception> predicateNoRegionsInTransition() {
return new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
final RegionStates regionStates = getMiniHBaseCluster().getMaster()
.getAssignmentManager().getRegionStates();
return !regionStates.isRegionsInTransition();
}
};
}
}

View File

@ -18,10 +18,10 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
@ -38,11 +38,17 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@ -52,6 +58,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
/**
* This class is for testing HCM features
*/
@ -62,8 +70,10 @@ public class TestHCM {
private static final byte[] TABLE_NAME = Bytes.toBytes("test");
private static final byte[] TABLE_NAME1 = Bytes.toBytes("test1");
private static final byte[] TABLE_NAME2 = Bytes.toBytes("test2");
private static final byte[] TABLE_NAME3 = Bytes.toBytes("test3");
private static final byte[] FAM_NAM = Bytes.toBytes("f");
private static final byte[] ROW = Bytes.toBytes("bbb");
private static final byte[] ROW_X = Bytes.toBytes("xxx");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@ -106,14 +116,14 @@ public class TestHCM {
private static int getHConnectionManagerCacheSize(){
return HConnectionTestingUtility.getConnectionCount();
}
@Test
public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
// Save off current HConnections
Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
new HashMap<HConnectionKey, HConnectionImplementation>();
oldHBaseInstances.putAll(HConnectionManager.HBASE_INSTANCES);
HConnectionManager.HBASE_INSTANCES.clear();
try {
@ -536,6 +546,120 @@ public class TestHCM {
conn.close();
}
@Test
public void testMulti() throws Exception {
HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM);
TEST_UTIL.createMultiRegions(table, FAM_NAM);
HConnectionManager.HConnectionImplementation conn =
(HConnectionManager.HConnectionImplementation)
HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
// We're now going to move the region and check that it works for the client
// First a new put to add the location in the cache
conn.clearRegionCache(TABLE_NAME3);
Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
// We can wait for all regions to be online, that makes log reading easier when debugging
while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
Thread.sleep(1);
}
Put put = new Put(ROW_X);
put.add(FAM_NAM, ROW_X, ROW_X);
table.put(put);
// Now moving the region to the second server
HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X);
byte[] regionName = toMove.getRegionInfo().getRegionName();
byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
// Choose the other server.
int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
int destServerId = (curServerId == 0 ? 1 : 0);
HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
ServerName destServerName = destServer.getServerName();
//find another row in the cur server that is less than ROW_X
List<HRegion> regions = curServer.getOnlineRegions(TABLE_NAME3);
byte[] otherRow = null;
for (HRegion region : regions) {
if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
&& Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
otherRow = region.getRegionInfo().getStartKey();
break;
}
}
assertNotNull(otherRow);
Put put2 = new Put(otherRow);
put2.add(FAM_NAM, otherRow, otherRow);
table.put(put2); //cache put2's location
// Check that we are in the expected state
Assert.assertTrue(curServer != destServer);
Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
Assert.assertNotNull(curServer.getOnlineRegion(regionName));
Assert.assertNull(destServer.getOnlineRegion(regionName));
Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().getRegionStates().isRegionsInTransition());
// Moving. It's possible that we don't have all the regions online at this point, so
// the test must depends only on the region we're looking at.
LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
TEST_UTIL.getHBaseAdmin().move(
toMove.getRegionInfo().getEncodedNameAsBytes(),
destServerName.getServerName().getBytes()
);
while (destServer.getOnlineRegion(regionName) == null ||
destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
// wait for the move to be finished
Thread.sleep(1);
}
LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
// Check our new state.
Assert.assertNull(curServer.getOnlineRegion(regionName));
Assert.assertNotNull(destServer.getOnlineRegion(regionName));
Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
// Cache was NOT updated and points to the wrong server
Assert.assertFalse(
conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort());
// Hijack the number of retry to fail after 2 tries
Field numRetries = conn.getClass().getDeclaredField("numRetries");
numRetries.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL);
final int prevNumRetriesVal = (Integer)numRetries.get(conn);
numRetries.set(conn, 2);
Put put3 = new Put(ROW_X);
put3.add(FAM_NAM, ROW_X, ROW_X);
Put put4 = new Put(otherRow);
put4.add(FAM_NAM, otherRow, otherRow);
// do multi
table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
// second we get RegionMovedException.
numRetries.set(conn, prevNumRetriesVal);
table.close();
conn.close();
}
}

View File

@ -35,10 +35,9 @@ import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.HBaseClient;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
@ -85,16 +84,13 @@ public class TestMultiParallel {
if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
// Distribute regions
UTIL.getMiniHBaseCluster().getMaster().balance();
// Wait until completing balance
final RegionStates regionStates = UTIL.getMiniHBaseCluster().getMaster()
.getAssignmentManager().getRegionStates();
UTIL.waitFor(15 * 1000, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !regionStates.isRegionsInTransition();
}
});
UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
}
HConnection conn = HConnectionManager.getConnection(UTIL.getConfiguration());
conn.clearRegionCache();
conn.close();
LOG.info("before done");
}
@ -143,7 +139,7 @@ public class TestMultiParallel {
* @throws NoSuchFieldException
* @throws SecurityException
*/
@Test(timeout=300000)
@Test(timeout=300000)
public void testActiveThreadsCount() throws Exception{
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
List<Row> puts = constructPutRequests(); // creates a Put for every region
@ -155,7 +151,7 @@ public class TestMultiParallel {
table.close();
}
@Test(timeout=300000)
@Test(timeout=300000)
public void testBatchWithGet() throws Exception {
LOG.info("test=testBatchWithGet");
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
@ -237,7 +233,7 @@ public class TestMultiParallel {
*
* @throws Exception
*/
@Test (timeout=300000)
@Test (timeout=300000)
public void testFlushCommitsWithAbort() throws Exception {
LOG.info("test=testFlushCommitsWithAbort");
doTestFlushCommits(true);
@ -262,7 +258,7 @@ public class TestMultiParallel {
}
LOG.info("puts");
table.flushCommits();
int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
.size();
assert liveRScount > 0;
JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
@ -304,6 +300,18 @@ public class TestMultiParallel {
int regions = ProtobufUtil.getOnlineRegions(t.getRegionServer()).size();
// Assert.assertTrue("Count of regions=" + regions, regions > 10);
}
if (doAbort) {
UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return UTIL.getMiniHBaseCluster().getMaster()
.getClusterStatus().getServersSize() == (liveRScount - 1);
}
});
UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
}
table.close();
LOG.info("done");
}
@ -337,7 +345,7 @@ public class TestMultiParallel {
table.close();
}
@Test(timeout=300000)
@Test(timeout=300000)
public void testBatchWithDelete() throws Exception {
LOG.info("test=testBatchWithDelete");
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);