HBASE-2898 MultiPut makes proper error handling impossible and leads to corrupted data

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1033321 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2010-11-10 01:54:13 +00:00
parent 1eb39d194b
commit 7e394dcb87
11 changed files with 429 additions and 222 deletions

View File

@ -667,6 +667,8 @@ Release 0.90.0 - Unreleased
HBASE-3199 large response handling: some fixups and cleanups
HBASE-3212 More testing of enable/disable uncovered base condition not in
place; i.e. that only one enable/disable runs at a time
HBASE-2898 MultiPut makes proper error handling impossible and leads to
corrupted data
IMPROVEMENTS

View File

@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@ -244,39 +243,19 @@ public interface HConnection extends Abortable {
* Process a mixed batch of Get, Put and Delete actions. All actions for a
* RegionServer are forwarded in one RPC call.
*
*
* @param actions The collection of actions.
* @param tableName Name of the hbase table
* @param pool thread pool for parallel execution
* @param results An empty array, same size as list. If an exception is thrown,
* you can test here for partial results, and to determine which actions
* processed successfully.
* @throws IOException
* @throws IOException if there are problems talking to META. Per-item
* exceptions are stored in the results array.
*/
public void processBatch(List<Row> actions, final byte[] tableName,
ExecutorService pool, Result[] results)
throws IOException;
/**
* Process a batch of Puts. Does the retries.
* @param list A batch of Puts to process.
* @param tableName The name of the table
* @return Count of committed Puts. On fault, < list.size().
* @throws IOException if a remote or network exception occurs
* @deprecated Use HConnectionManager::processBatch instead.
*/
public int processBatchOfRows(ArrayList<Put> list, byte[] tableName, ExecutorService pool)
throws IOException;
/**
* Process a batch of Deletes. Does the retries.
* @param list A batch of Deletes to process.
* @param tableName The name of the table
* @return Count of committed Deletes. On fault, < list.size().
* @throws IOException if a remote or network exception occurs
* @deprecated Use HConnectionManager::processBatch instead.
*/
public int processBatchOfDeletes(List<Delete> list, byte[] tableName, ExecutorService pool)
throws IOException;
ExecutorService pool, Object[] results)
throws IOException, InterruptedException;
/**
* Process a batch of Puts.
@ -289,7 +268,8 @@ public interface HConnection extends Abortable {
* @deprecated Use HConnectionManager::processBatch instead.
*/
public void processBatchOfPuts(List<Put> list,
final byte[] tableName, ExecutorService pool) throws IOException;
final byte[] tableName, ExecutorService pool)
throws IOException;
/**
* Enable or disable region cache prefetch for the table. It will be

View File

@ -1029,39 +1029,6 @@ public class HConnectionManager {
}
}
/**
* @deprecated Use HConnectionManager::processBatch instead.
*/
public int processBatchOfRows(final ArrayList<Put> list, final byte[] tableName, ExecutorService pool)
throws IOException {
Result[] results = new Result[list.size()];
processBatch((List) list, tableName, pool, results);
int count = 0;
for (Result r : results) {
if (r != null) {
count++;
}
}
return (count == list.size() ? -1 : count);
}
/**
* @deprecated Use HConnectionManager::processBatch instead.
*/
public int processBatchOfDeletes(final List<Delete> list,
final byte[] tableName, ExecutorService pool)
throws IOException {
Result[] results = new Result[list.size()];
processBatch((List) list, tableName, pool, results);
int count = 0;
for (Result r : results) {
if (r != null) {
count++;
}
}
return (count == list.size() ? -1 : count);
}
void close(boolean stopProxy) {
if (master != null) {
if (stopProxy) {
@ -1109,7 +1076,7 @@ public class HConnectionManager {
public void processBatch(List<Row> list,
final byte[] tableName,
ExecutorService pool,
Result[] results) throws IOException {
Object[] results) throws IOException, InterruptedException {
// results must be the same size as list
if (results.length != list.size()) {
@ -1120,8 +1087,10 @@ public class HConnectionManager {
return;
}
// Keep track of the most recent servers for any given item for better
// exceptional reporting.
HServerAddress [] lastServers = new HServerAddress[results.length];
List<Row> workingList = new ArrayList<Row>(list);
final boolean singletonList = (list.size() == 1);
boolean retry = true;
Throwable singleRowCause = null;
@ -1131,13 +1100,7 @@ public class HConnectionManager {
if (tries >= 1) {
long sleepTime = getPauseTime(tries);
LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ignore) {
LOG.debug("Interupted");
Thread.currentThread().interrupt();
break;
}
}
// step 1: break up into regionserver-sized chunks and build the data structs
@ -1157,6 +1120,7 @@ public class HConnectionManager {
}
Action action = new Action(regionName, row, i);
lastServers[i] = address;
actions.add(regionName, action);
}
}
@ -1176,58 +1140,50 @@ public class HConnectionManager {
HServerAddress address = responsePerServer.getKey();
try {
// Gather the results for one server
Future<MultiResponse> future = responsePerServer.getValue();
// Not really sure what a reasonable timeout value is. Here's a first try.
MultiResponse resp = future.get();
if (resp == null) {
// Entire server failed
LOG.debug("Failed all for server: " + address + ", removing from cache");
} else {
// For each region
for (Entry<byte[], List<Pair<Integer,Result>>> e : resp.getResults().entrySet()) {
continue;
}
for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
byte[] regionName = e.getKey();
List<Pair<Integer, Result>> regionResults = e.getValue();
for (Pair<Integer, Result> regionResult : regionResults) {
List<Pair<Integer, Object>> regionResults = e.getValue();
for (Pair<Integer, Object> regionResult : regionResults) {
if (regionResult == null) {
// if the first/only record is 'null' the entire region failed.
LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache");
LOG.debug("Failures for region: " +
Bytes.toStringBinary(regionName) +
", removing from cache");
} else {
// success
// Result might be an Exception, including DNRIOE
results[regionResult.getFirst()] = regionResult.getSecond();
}
}
}
}
} catch (InterruptedException e) {
LOG.debug("Failed all from " + address, e);
Thread.currentThread().interrupt();
break;
} catch (ExecutionException e) {
LOG.debug("Failed all from " + address, e);
// Just give up, leaving the batch incomplete
if (e.getCause() instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException) e.getCause();
}
}
if (singletonList) {
// be richer for reporting in a 1 row case.
singleRowCause = e.getCause();
}
}
}
// step 4: identify failures and prep for a retry (if applicable).
// Find failures (i.e. null Result), and add them to the workingList (in
// order), so they can be retried.
retry = false;
workingList.clear();
for (int i = 0; i < results.length; i++) {
if (results[i] == null) {
// if null (fail) or instanceof Throwable && not instanceof DNRIOE
// then retry that row. else dont.
if (results[i] == null ||
(results[i] instanceof Throwable &&
!(results[i] instanceof DoNotRetryIOException))) {
retry = true;
Row row = list.get(i);
workingList.add(row);
deleteCachedLocation(tableName, row.getRow());
@ -1238,19 +1194,31 @@ public class HConnectionManager {
}
}
if (Thread.currentThread().isInterrupted()) {
throw new IOException("Aborting attempt because of a thread interruption");
}
if (retry) {
// ran out of retries and didn't successfully finish everything!
// Simple little check for 1 item failures.
if (singleRowCause != null) {
throw new IOException(singleRowCause);
} else {
throw new RetriesExhaustedException("Still had " + workingList.size()
+ " actions left after retrying " + numRetries + " times.");
}
}
List<Throwable> exceptions = new ArrayList<Throwable>();
List<Row> actions = new ArrayList<Row>();
List<HServerAddress> addresses = new ArrayList<HServerAddress>();
for (int i = 0 ; i < results.length; i++) {
if (results[i] == null || results[i] instanceof Throwable) {
exceptions.add((Throwable)results[i]);
actions.add(list.get(i));
addresses.add(lastServers[i]);
}
}
if (!exceptions.isEmpty()) {
throw new RetriesExhaustedWithDetailsException(exceptions,
actions,
addresses);
}
}
/**
@ -1259,19 +1227,24 @@ public class HConnectionManager {
public void processBatchOfPuts(List<Put> list,
final byte[] tableName,
ExecutorService pool) throws IOException {
Result[] results = new Result[list.size()];
Object[] results = new Object[list.size()];
try {
processBatch((List) list, tableName, pool, results);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
// mutate list so that it is empty for complete success, or contains only failed records
// results are returned in the same order as the requests in list
// walk the list backwards, so we can remove from list without impacting the indexes of earlier members
for (int i = results.length - 1; i>=0; i--) {
// if result is not null, it succeeded
if (results[i] != null) {
if (results[i] instanceof Result) {
// successful Puts are removed from the list here.
list.remove(i);
}
}
}
}
private Throwable translateException(Throwable t) throws IOException {
if (t instanceof UndeclaredThrowableException) {

View File

@ -553,7 +553,21 @@ public class HTable implements HTableInterface {
}
public Result[] get(List<Get> gets) throws IOException {
return batch((List) gets);
try {
Object [] r1 = batch((List)gets);
// translate.
Result [] results = new Result[r1.length];
int i=0;
for (Object o : r1) {
// batch ensures if there is a failure we get an exception instead
results[i++] = (Result) o;
}
return results;
} catch (InterruptedException e) {
throw new IOException(e);
}
}
/**
@ -563,13 +577,15 @@ public class HTable implements HTableInterface {
* guaranteed that the Get returns what the Put had put.
*
* @param actions list of Get, Put, Delete objects
* @param results Empty Result[], same size as actions. Provides access to partial
* results, in case an exception is thrown. A null in the result array means that
* the call for that action failed, even after retries
* @param results Empty Result[], same size as actions. Provides access to
* partial results, in case an exception is thrown. If there are any failures,
* there will be a null or Throwable will be in the results array, AND an
* exception will be thrown.
* @throws IOException
*/
@Override
public synchronized void batch(final List<Row> actions, final Result[] results) throws IOException {
public synchronized void batch(final List<Row> actions, final Object[] results)
throws InterruptedException, IOException {
connection.processBatch(actions, tableName, pool, results);
}
@ -582,8 +598,8 @@ public class HTable implements HTableInterface {
* @throws IOException
*/
@Override
public synchronized Result[] batch(final List<Row> actions) throws IOException {
Result[] results = new Result[actions.size()];
public synchronized Object[] batch(final List<Row> actions) throws InterruptedException, IOException {
Object[] results = new Object[actions.size()];
connection.processBatch(actions, tableName, pool, results);
return results;
}
@ -616,23 +632,28 @@ public class HTable implements HTableInterface {
* the {@code deletes} argument will contain the {@link Delete} instances
* that have not be successfully applied.
* @since 0.20.1
* @see {@link #batch(java.util.List, Object[])}
*/
@Override
public void delete(final List<Delete> deletes)
throws IOException {
Result[] results = new Result[deletes.size()];
Object[] results = new Object[deletes.size()];
try {
connection.processBatch((List) deletes, tableName, pool, results);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
// mutate list so that it is empty for complete success, or contains only failed records
// results are returned in the same order as the requests in list
// walk the list backwards, so we can remove from list without impacting the indexes of earlier members
for (int i = results.length - 1; i>=0; i--) {
// if result is not null, it succeeded
if (results[i] != null) {
if (results[i] instanceof Result) {
deletes.remove(i);
}
}
}
}
@Override
public void put(final Put put) throws IOException {

View File

@ -74,24 +74,25 @@ public interface HTableInterface {
* Method that does a batch call on Deletes, Gets and Puts.
*
* @param actions list of Get, Put, Delete objects
* @param results Empty Result[], same size as actions. Provides access to partial
* @param results Empty Object[], same size as actions. Provides access to partial
* results, in case an exception is thrown. A null in the result array means that
* the call for that action failed, even after retries
* @throws IOException
* @since 0.90.0
*/
void batch(final List<Row> actions, final Result[] results) throws IOException;
void batch(final List<Row> actions, final Object[] results) throws IOException, InterruptedException;
/**
* Method that does a batch call on Deletes, Gets and Puts.
*
*
* @param actions list of Get, Put, Delete objects
* @return the results from the actions. A null in the return array means that
* the call for that action failed, even after retries
* @throws IOException
* @since 0.90.0
*/
Result[] batch(final List<Row> actions) throws IOException;
Object[] batch(final List<Row> actions) throws IOException, InterruptedException;
/**
* Extracts certain cells from a given row.

View File

@ -25,10 +25,14 @@ import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringUtils;
import java.io.DataOutput;
import java.io.IOException;
import java.io.DataInput;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -42,8 +46,8 @@ public class MultiResponse implements Writable {
// map of regionName to list of (Results paired to the original index for that
// Result)
private Map<byte[], List<Pair<Integer, Result>>> results =
new TreeMap<byte[], List<Pair<Integer, Result>>>(Bytes.BYTES_COMPARATOR);
private Map<byte[], List<Pair<Integer, Object>>> results =
new TreeMap<byte[], List<Pair<Integer, Object>>>(Bytes.BYTES_COMPARATOR);
public MultiResponse() {
}
@ -68,32 +72,52 @@ public class MultiResponse implements Writable {
* (request). Second item is the Result. Result will be empty for
* successful Put and Delete actions.
*/
public void add(byte[] regionName, Pair<Integer, Result> r) {
List<Pair<Integer, Result>> rs = results.get(regionName);
public void add(byte[] regionName, Pair<Integer, Object> r) {
List<Pair<Integer, Object>> rs = results.get(regionName);
if (rs == null) {
rs = new ArrayList<Pair<Integer, Result>>();
rs = new ArrayList<Pair<Integer, Object>>();
results.put(regionName, rs);
}
rs.add(r);
}
public Map<byte[], List<Pair<Integer, Result>>> getResults() {
public void add(byte []regionName, int originalIndex, Object resOrEx) {
add(regionName, new Pair<Integer,Object>(originalIndex, resOrEx));
}
public Map<byte[], List<Pair<Integer, Object>>> getResults() {
return results;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(results.size());
for (Map.Entry<byte[], List<Pair<Integer, Result>>> e : results.entrySet()) {
for (Map.Entry<byte[], List<Pair<Integer, Object>>> e : results.entrySet()) {
Bytes.writeByteArray(out, e.getKey());
List<Pair<Integer, Result>> lst = e.getValue();
List<Pair<Integer, Object>> lst = e.getValue();
out.writeInt(lst.size());
for (Pair<Integer, Result> r : lst) {
for (Pair<Integer, Object> r : lst) {
if (r == null) {
out.writeInt(-1); // Cant have index -1; on other side we recognize -1 as 'null'
} else {
out.writeInt(r.getFirst()); // Can this can npe!?!
HbaseObjectWritable.writeObject(out, r.getSecond(), Result.class, null);
Object obj = r.getSecond();
if (obj instanceof Throwable) {
out.writeBoolean(true); // true, Throwable/exception.
Throwable t = (Throwable) obj;
// serialize exception
WritableUtils.writeString(out, t.getClass().getName());
WritableUtils.writeString(out,
StringUtils.stringifyException(t));
} else {
out.writeBoolean(false); // no exception
if (! (obj instanceof Writable))
obj = null; // squash all non-writables to null.
HbaseObjectWritable.writeObject(out, obj, Result.class, null);
}
}
}
}
@ -106,15 +130,33 @@ public class MultiResponse implements Writable {
for (int i = 0; i < mapSize; i++) {
byte[] key = Bytes.readByteArray(in);
int listSize = in.readInt();
List<Pair<Integer, Result>> lst = new ArrayList<Pair<Integer, Result>>(
List<Pair<Integer, Object>> lst = new ArrayList<Pair<Integer, Object>>(
listSize);
for (int j = 0; j < listSize; j++) {
Integer idx = in.readInt();
if (idx == -1) {
lst.add(null);
} else {
Result r = (Result) HbaseObjectWritable.readObject(in, null);
lst.add(new Pair<Integer, Result>(idx, r));
boolean isException = in.readBoolean();
Object o = null;
if (isException) {
String klass = WritableUtils.readString(in);
String desc = WritableUtils.readString(in);
try {
// the type-unsafe insertion, but since we control what klass is..
Class<? extends Throwable> c = (Class<? extends Throwable>) Class.forName(klass);
Constructor<? extends Throwable> cn = c.getDeclaredConstructor(String.class);
o = cn.newInstance(desc);
} catch (ClassNotFoundException ignored) {
} catch (NoSuchMethodException ignored) {
} catch (InvocationTargetException ignored) {
} catch (InstantiationException ignored) {
} catch (IllegalAccessException ignored) {
}
} else {
o = HbaseObjectWritable.readObject(in, null);
}
lst.add(new Pair<Integer, Object>(idx, o));
}
}
results.put(key, lst);

View File

@ -0,0 +1,137 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HServerAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* This subclass of {@link org.apache.hadoop.hbase.client.RetriesExhaustedException}
* is thrown when we have more information about which rows were causing which
* exceptions on what servers. You can call {@link #mayHaveClusterIssues()}
* and if the result is false, you have input error problems, otherwise you
* may have cluster issues. You can iterate over the causes, rows and last
* known server addresses via {@link #getNumExceptions()} and
* {@link #getCause(int)}, {@link #getRow(int)} and {@link #getAddress(int)}.
*/
public class RetriesExhaustedWithDetailsException extends RetriesExhaustedException {
List<Throwable> exceptions;
List<Row> actions;
List<HServerAddress> addresses;
public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
List<Row> actions,
List<HServerAddress> addresses) {
super("Failed " + exceptions.size() + " action" +
pluralize(exceptions) + ": " +
getDesc(exceptions,actions,addresses));
this.exceptions = exceptions;
this.actions = actions;
this.addresses = addresses;
}
public List<Throwable> getCauses() {
return exceptions;
}
public int getNumExceptions() {
return exceptions.size();
}
public Throwable getCause(int i) {
return exceptions.get(i);
}
public Row getRow(int i) {
return actions.get(i);
}
public HServerAddress getAddress(int i) {
return addresses.get(i);
}
public boolean mayHaveClusterIssues() {
boolean res = false;
// If all of the exceptions are DNRIOE not exception
for (Throwable t : exceptions) {
if ( !(t instanceof DoNotRetryIOException)) {
res = true;
}
}
return res;
}
public static String pluralize(Collection<?> c) {
return pluralize(c.size());
}
public static String pluralize(int c) {
return c > 1 ? "s" : "";
}
public static String getDesc(List<Throwable> exceptions,
List<Row> actions,
List<HServerAddress> addresses) {
String s = getDesc(classifyExs(exceptions));
s += "servers with issues: ";
Set<HServerAddress> uniqAddr = new HashSet<HServerAddress>();
uniqAddr.addAll(addresses);
for(HServerAddress addr : uniqAddr) {
s += addr + ", ";
}
return s;
}
public static Map<String, Integer> classifyExs(List<Throwable> ths) {
Map<String, Integer> cls = new HashMap<String, Integer>();
for (Throwable t : ths) {
String name = t.getClass().getSimpleName();
Integer i = cls.get(name);
if (i == null) {
i = 0;
}
i += 1;
cls.put(name, i);
}
return cls;
}
public static String getDesc(Map<String,Integer> classificaton) {
String s = "";
for (Map.Entry<String, Integer> e : classificaton.entrySet()) {
s += e.getKey() + ": " + e.getValue() + " time" +
pluralize(e.getValue()) + ", ";
}
return s;
}
}

View File

@ -46,12 +46,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
@ -1884,7 +1886,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
String lockName = String.valueOf(lockId);
Integer rl = rowlocks.get(lockName);
if (rl == null) {
throw new IOException("Invalid row lock");
throw new UnknownRowLockException("Invalid row lock");
}
this.leases.renewLease(lockName);
return rl;
@ -2374,7 +2376,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
@SuppressWarnings("unchecked")
@Override
public MultiResponse multi(MultiAction multi) throws IOException {
MultiResponse response = new MultiResponse();
for (Map.Entry<byte[], List<Action>> e : multi.actions.entrySet()) {
byte[] regionName = e.getKey();
List<Action> actionsForRegion = e.getValue();
@ -2382,24 +2386,26 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// end of a region, so that we don't have to try the rest of the
// actions in the list.
Collections.sort(actionsForRegion);
Row action = null;
Row action;
List<Action> puts = new ArrayList<Action>();
try {
for (Action a : actionsForRegion) {
action = a.getAction();
// TODO catch exceptions so we can report them on a per-item basis.
int originalIndex = a.getOriginalIndex();
try {
if (action instanceof Delete) {
delete(regionName, (Delete) action);
response.add(regionName, new Pair<Integer, Result>(
a.getOriginalIndex(), new Result()));
response.add(regionName, originalIndex, new Result());
} else if (action instanceof Get) {
response.add(regionName, new Pair<Integer, Result>(
a.getOriginalIndex(), get(regionName, (Get) action)));
response.add(regionName, originalIndex, get(regionName, (Get) action));
} else if (action instanceof Put) {
puts.add(a);
puts.add(a); // wont throw.
} else {
LOG.debug("Error: invalid Action, row must be a Get, Delete or Put.");
throw new IllegalArgumentException("Invalid Action, row must be a Get, Delete or Put.");
throw new DoNotRetryIOException("Invalid Action, row must be a Get, Delete or Put.");
}
} catch (IOException ex) {
response.add(regionName, originalIndex, ex);
}
}
@ -2407,46 +2413,54 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// we so need. All this data munging doesn't seem great, but at least
// we arent copying bytes or anything.
if (!puts.isEmpty()) {
try {
HRegion region = getRegion(regionName);
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}
Pair<Put,Integer> [] putsWithLocks = new Pair[puts.size()];
int i = 0;
List<Pair<Put,Integer>> putsWithLocks =
Lists.newArrayListWithCapacity(puts.size());
for (Action a : puts) {
Put p = (Put) a.getAction();
Integer lock = getLockFromId(p.getLockId());
putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
Integer lock;
try {
lock = getLockFromId(p.getLockId());
} catch (UnknownRowLockException ex) {
response.add(regionName, a.getOriginalIndex(), ex);
continue;
}
putsWithLocks.add(new Pair<Put, Integer>(p, lock));
}
this.requestCount.addAndGet(puts.size());
OperationStatusCode[] codes = region.put(putsWithLocks);
for( i = 0 ; i < codes.length ; i++) {
OperationStatusCode[] codes =
region.put(putsWithLocks.toArray(new Pair[]{}));
for( int i = 0 ; i < codes.length ; i++) {
OperationStatusCode code = codes[i];
Action theAction = puts.get(i);
Result result = null;
Object result = null;
if (code == OperationStatusCode.SUCCESS) {
result = new Result();
} else if (code == OperationStatusCode.BAD_FAMILY) {
result = new NoSuchColumnFamilyException();
}
// TODO turning the alternate exception into a different result
// FAILURE && NOT_RUN becomes null, aka: need to run again.
response.add(regionName,
new Pair<Integer, Result>(
theAction.getOriginalIndex(), result));
}
response.add(regionName, theAction.getOriginalIndex(), result);
}
} catch (IOException ioe) {
if (multi.size() == 1) throw ioe;
LOG.debug("Exception processing " +
org.apache.commons.lang.StringUtils.abbreviate(action.toString(), 64) +
"; " + ioe.getMessage());
response.add(regionName,null);
// stop processing on this region, continue to the next.
// fail all the puts with the ioe in question.
for (Action a: puts) {
response.add(regionName, a.getOriginalIndex(), ioe);
}
}
}
}
return response;

View File

@ -23,6 +23,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
/**
* Thrown by the region server when it is shutting down state.
*
* Should NEVER be thrown to HBase clients, they will abort the call chain
* and not retry even though regions will transition to new servers.
*/
@SuppressWarnings("serial")
public class RegionServerStoppedException extends DoNotRetryIOException {

View File

@ -604,12 +604,12 @@ public class RemoteHTable implements HTableInterface {
}
@Override
public void batch(List<Row> actions, Result[] results) throws IOException {
public void batch(List<Row> actions, Object[] results) throws IOException {
throw new IOException("batch not supported");
}
@Override
public Result[] batch(List<Row> actions) throws IOException {
public Object[] batch(List<Row> actions) throws IOException {
throw new IOException("batch not supported");
}

View File

@ -35,6 +35,8 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestMultiParallel {
private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@ -137,6 +139,35 @@ public class TestMultiParallel {
}
}
@Test
public void testBadFam() throws Exception {
LOG.info("test=testBadFam");
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
List<Row> actions = new ArrayList<Row>();
Put p = new Put(Bytes.toBytes("row1"));
p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
actions.add(p);
p = new Put(Bytes.toBytes("row2"));
p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
actions.add(p);
// row1 and row2 should be in the same region.
Object [] r = new Object[actions.size()];
try {
table.batch(actions, r);
fail();
} catch (RetriesExhaustedWithDetailsException ex) {
LOG.debug(ex);
// good!
assertFalse(ex.mayHaveClusterIssues());
}
assertEquals(2, r.length);
assertTrue(r[0] instanceof Throwable);
assertTrue(r[1] instanceof Result);
}
/**
* Only run one Multi test with a forced RegionServer abort. Otherwise, the
* unit tests will take an unnecessarily long time to run.
@ -208,7 +239,7 @@ public class TestMultiParallel {
// put multiple rows using a batch
List<Row> puts = constructPutRequests();
Result[] results = table.batch(puts);
Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
if (true) {
@ -228,7 +259,7 @@ public class TestMultiParallel {
// Load some data
List<Row> puts = constructPutRequests();
Result[] results = table.batch(puts);
Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
// Deletes
@ -256,7 +287,7 @@ public class TestMultiParallel {
// Load some data
List<Row> puts = constructPutRequests();
Result[] results = table.batch(puts);
Object[] results = table.batch(puts);
validateSizeAndEmpty(results, KEYS.length);
// Deletes
@ -289,7 +320,7 @@ public class TestMultiParallel {
put.add(BYTES_FAMILY, qual, VALUE);
puts.add(put);
}
Result[] results = table.batch(puts);
Object[] results = table.batch(puts);
// validate
validateSizeAndEmpty(results, 100);
@ -303,10 +334,10 @@ public class TestMultiParallel {
gets.add(get);
}
Result[] multiRes = table.batch(gets);
Object[] multiRes = table.batch(gets);
int idx = 0;
for (Result r : multiRes) {
for (Object r : multiRes) {
byte[] qual = Bytes.toBytes("column" + idx);
validateResult(r, qual, VALUE);
idx++;
@ -319,7 +350,7 @@ public class TestMultiParallel {
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
// Load some data to start
Result[] results = table.batch(constructPutRequests());
Object[] results = table.batch(constructPutRequests());
validateSizeAndEmpty(results, KEYS.length);
// Batch: get, get, put(new col), delete, get, get of put, get of deleted,
@ -383,11 +414,13 @@ public class TestMultiParallel {
// // Helper methods ////
private void validateResult(Result r) {
private void validateResult(Object r) {
validateResult(r, QUALIFIER, VALUE);
}
private void validateResult(Result r, byte[] qual, byte[] val) {
private void validateResult(Object r1, byte[] qual, byte[] val) {
// TODO provide nice assert here or something.
Result r = (Result)r1;
Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
Assert.assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual)));
}
@ -415,16 +448,17 @@ public class TestMultiParallel {
}
}
private void validateEmpty(Result result) {
private void validateEmpty(Object r1) {
Result result = (Result)r1;
Assert.assertTrue(result != null);
Assert.assertTrue(result.getRow() == null);
Assert.assertEquals(0, result.raw().length);
}
private void validateSizeAndEmpty(Result[] results, int expectedSize) {
private void validateSizeAndEmpty(Object[] results, int expectedSize) {
// Validate got back the same number of Result objects, all empty
Assert.assertEquals(expectedSize, results.length);
for (Result result : results) {
for (Object result : results) {
validateEmpty(result);
}
}