hbase-9390: coprocessors observers are not called during a recovery with the new log replay algorithm - review addendum

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1527444 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jeffreyz 2013-09-30 05:13:03 +00:00
parent 678e8c52bf
commit 266e1791d1
3 changed files with 109 additions and 86 deletions

View File

@ -5610,83 +5610,4 @@ public class HRegion implements HeapSize { // , Writable{
}
}
}
/**
* This function is used to construct replay mutations from WALEdits
* @param entries
* @param cells
* @param clusterId
* @param logEntries List of Pair<HLogKey, WALEdit> contructed from its PB version - WALEntry
* instances
* @return list of Pair<MutationType, Mutation> to be replayed
* @throws IOException
*/
List<Pair<MutationType, Mutation>> getReplayMutations(List<WALEntry> entries,
CellScanner cells, UUID clusterId, List<Pair<HLogKey, WALEdit>> logEntries)
throws IOException {
List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType, Mutation>>();
List<Pair<MutationType, Mutation>> tmpEditMutations =
new ArrayList<Pair<MutationType, Mutation>>();
for (WALEntry entry : entries) {
HLogKey logKey = null;
WALEdit val = null;
Cell previousCell = null;
Mutation m = null;
tmpEditMutations.clear();
int count = entry.getAssociatedCellCount();
if (coprocessorHost != null) {
val = new WALEdit();
}
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
boolean isNewRowOrType =
previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
|| !CellUtil.matchingRow(previousCell, cell);
if (isNewRowOrType) {
// Create new mutation
if (CellUtil.isDelete(cell)) {
m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
tmpEditMutations.add(new Pair<MutationType, Mutation>(MutationType.DELETE, m));
} else {
m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
tmpEditMutations.add(new Pair<MutationType, Mutation>(MutationType.PUT, m));
}
}
if (CellUtil.isDelete(cell)) {
((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
} else {
((Put) m).add(KeyValueUtil.ensureKeyValue(cell));
}
previousCell = cell;
}
// Start coprocessor replay here. The coprocessor is for each WALEdit
// instead of a KeyValue.
if (coprocessorHost != null) {
WALKey walKey = entry.getKey();
logKey =
new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
.getTableName().toByteArray()), walKey.getLogSequenceNumber(),
walKey.getWriteTime(), clusterId);
if (coprocessorHost.preWALRestore(this.getRegionInfo(), logKey, val)) {
// if bypass this log entry, ignore it ...
continue;
}
logEntries.add(new Pair<HLogKey, WALEdit>(logKey, val));
}
mutations.addAll(tmpEditMutations);
}
return mutations;
}
}

View File

@ -197,6 +197,7 @@ import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -3888,9 +3889,27 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
HRegion region = this.getRegionByEncodedName(
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
List<Pair<MutationType, Mutation>> mutations = region.getReplayMutations(
request.getEntryList(), cells, UUID.fromString(this.clusterId), walEntries);
List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType, Mutation>>();
for (WALEntry entry : entries) {
Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<HLogKey, WALEdit>();
List<Pair<MutationType, Mutation>> edits = HLogSplitter.getMutationsFromWALEntry(entry,
cells, walEntry);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue.
if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
walEntry.getSecond())) {
// if bypass this log entry, ignore it ...
continue;
}
walEntries.add(walEntry);
}
mutations.addAll(edits);
}
if (!mutations.isEmpty()) {
OperationStatus[] result = doBatchOp(region, mutations, true);
// check if it's a partial success
@ -3900,9 +3919,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
}
}
}
if (region.getCoprocessorHost() != null) {
if (coprocessorHost != null) {
for (Pair<HLogKey, WALEdit> wal : walEntries) {
region.getCoprocessorHost().postWALRestore(region.getRegionInfo(), wal.getFirst(),
coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
wal.getSecond());
}
}
@ -4117,12 +4136,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
}
/**
* Execute a list of Put/Delete mutations.
* Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
* constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
* @param region
* @param mutations
* @param isReplay
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any
* @return an array of OperationStatus which internally contains the OperationStatusCode and the
* exceptionMessage if any
* @throws IOException
*/
protected OperationStatus[] doBatchOp(final HRegion region,

View File

@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@ -52,6 +53,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -64,6 +69,7 @@ import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@ -73,9 +79,13 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -1827,4 +1837,76 @@ public class HLogSplitter {
super(s);
}
}
/**
* This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey &
* WALEdit from the passed in WALEntry
* @param entry
* @param cells
* @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances
* extracted from the passed in WALEntry.
* @return list of Pair<MutationType, Mutation> to be replayed
* @throws IOException
*/
public static List<Pair<MutationType, Mutation>> getMutationsFromWALEntry(WALEntry entry,
CellScanner cells, Pair<HLogKey, WALEdit> logEntry) throws IOException {
if (entry == null) {
// return an empty array
return new ArrayList<Pair<MutationType, Mutation>>();
}
int count = entry.getAssociatedCellCount();
List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType, Mutation>>();
Cell previousCell = null;
Mutation m = null;
HLogKey key = null;
WALEdit val = null;
if (logEntry != null) val = new WALEdit();
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
boolean isNewRowOrType =
previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
|| !CellUtil.matchingRow(previousCell, cell);
if (isNewRowOrType) {
// Create new mutation
if (CellUtil.isDelete(cell)) {
m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
mutations.add(new Pair<MutationType, Mutation>(MutationType.DELETE, m));
} else {
m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
mutations.add(new Pair<MutationType, Mutation>(MutationType.PUT, m));
}
}
if (CellUtil.isDelete(cell)) {
((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
} else {
((Put) m).add(KeyValueUtil.ensureKeyValue(cell));
}
previousCell = cell;
}
// reconstruct HLogKey
if (logEntry != null) {
WALKey walKey = entry.getKey();
List<UUID> clusterIds = new ArrayList<UUID>(walKey.getClusterIdsCount());
for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
}
key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
.getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(),
clusterIds);
logEntry.setFirst(key);
logEntry.setSecond(val);
}
return mutations;
}
}