HBASE-11475 Distributed log replay should also replay compaction events

This commit is contained in:
Enis Soztutar 2014-07-10 10:50:22 -07:00
parent 95ef3acdd3
commit 09fcdb50af
12 changed files with 270 additions and 55 deletions

View File

@ -2495,6 +2495,7 @@ public final class ProtobufUtil {
for (Path outputPath : outputPaths) {
builder.addCompactionOutput(outputPath.getName());
}
builder.setRegionName(HBaseZeroCopyByteString.wrap(info.getRegionName()));
return builder.build();
}

View File

@ -3725,6 +3725,24 @@ public final class WALProtos {
*/
com.google.protobuf.ByteString
getStoreHomeDirBytes();
// optional bytes region_name = 7;
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
boolean hasRegionName();
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
com.google.protobuf.ByteString getRegionName();
}
/**
* Protobuf type {@code CompactionDescriptor}
@ -3821,6 +3839,11 @@ public final class WALProtos {
storeHomeDir_ = input.readBytes();
break;
}
case 58: {
bitField0_ |= 0x00000010;
regionName_ = input.readBytes();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -4026,6 +4049,30 @@ public final class WALProtos {
}
}
// optional bytes region_name = 7;
public static final int REGION_NAME_FIELD_NUMBER = 7;
private com.google.protobuf.ByteString regionName_;
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public boolean hasRegionName() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public com.google.protobuf.ByteString getRegionName() {
return regionName_;
}
private void initFields() {
tableName_ = com.google.protobuf.ByteString.EMPTY;
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
@ -4033,6 +4080,7 @@ public final class WALProtos {
compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
storeHomeDir_ = "";
regionName_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -4080,6 +4128,9 @@ public final class WALProtos {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(6, getStoreHomeDirBytes());
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBytes(7, regionName_);
}
getUnknownFields().writeTo(output);
}
@ -4123,6 +4174,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(6, getStoreHomeDirBytes());
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(7, regionName_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -4170,6 +4225,11 @@ public final class WALProtos {
result = result && getStoreHomeDir()
.equals(other.getStoreHomeDir());
}
result = result && (hasRegionName() == other.hasRegionName());
if (hasRegionName()) {
result = result && getRegionName()
.equals(other.getRegionName());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -4207,6 +4267,10 @@ public final class WALProtos {
hash = (37 * hash) + STORE_HOME_DIR_FIELD_NUMBER;
hash = (53 * hash) + getStoreHomeDir().hashCode();
}
if (hasRegionName()) {
hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
hash = (53 * hash) + getRegionName().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -4336,6 +4400,8 @@ public final class WALProtos {
bitField0_ = (bitField0_ & ~0x00000010);
storeHomeDir_ = "";
bitField0_ = (bitField0_ & ~0x00000020);
regionName_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@ -4392,6 +4458,10 @@ public final class WALProtos {
to_bitField0_ |= 0x00000008;
}
result.storeHomeDir_ = storeHomeDir_;
if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
to_bitField0_ |= 0x00000010;
}
result.regionName_ = regionName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -4442,6 +4512,9 @@ public final class WALProtos {
storeHomeDir_ = other.storeHomeDir_;
onChanged();
}
if (other.hasRegionName()) {
setRegionName(other.getRegionName());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -4869,6 +4942,58 @@ public final class WALProtos {
return this;
}
// optional bytes region_name = 7;
private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public boolean hasRegionName() {
return ((bitField0_ & 0x00000040) == 0x00000040);
}
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public com.google.protobuf.ByteString getRegionName() {
return regionName_;
}
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public Builder setRegionName(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000040;
regionName_ = value;
onChanged();
return this;
}
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public Builder clearRegionName() {
bitField0_ = (bitField0_ & ~0x00000040);
regionName_ = getDefaultInstance().getRegionName();
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:CompactionDescriptor)
}
@ -5275,15 +5400,16 @@ public final class WALProtos {
"ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
"\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number",
"\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" +
"\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compac" +
"\nscope_type\030\002 \002(\0162\n.ScopeType\"\276\001\n\024Compac" +
"tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" +
"coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" +
" \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" +
"tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" +
"\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICAT" +
"ION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_G" +
"LOBAL\020\001B?\n*org.apache.hadoop.hbase.proto" +
"buf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
"\t\022\023\n\013region_name\030\007 \001(\014\"\014\n\nWALTrailer*F\n\t" +
"ScopeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034" +
"\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apac" +
"he.hadoop.hbase.protobuf.generatedB\tWALP",
"rotosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -5313,7 +5439,7 @@ public final class WALProtos {
internal_static_CompactionDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CompactionDescriptor_descriptor,
new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", });
new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", "RegionName", });
internal_static_WALTrailer_descriptor =
getDescriptor().getMessageTypes().get(4);
internal_static_WALTrailer_fieldAccessorTable = new

View File

@ -92,6 +92,7 @@ message CompactionDescriptor {
repeated string compaction_input = 4;
repeated string compaction_output = 5;
required string store_home_dir = 6;
optional bytes region_name = 7; // full region name
}
/**

View File

@ -1625,7 +1625,7 @@ public class HRegion implements HeapSize { // , Writable{
*/
boolean shouldFlush() {
// This is a rough measure.
if (this.lastFlushSeqId > 0
if (this.lastFlushSeqId > 0
&& (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
return true;
}
@ -2195,7 +2195,7 @@ public class HRegion implements HeapSize { // , Writable{
public boolean isInReplay() {
return false;
}
@Override
public long getReplaySequenceId() {
return 0;
@ -2234,7 +2234,7 @@ public class HRegion implements HeapSize { // , Writable{
public boolean isInReplay() {
return true;
}
@Override
public long getReplaySequenceId() {
return this.replaySeqId;
@ -3295,7 +3295,7 @@ public class HRegion implements HeapSize { // , Writable{
firstSeqIdInLog = key.getLogSeqNum();
}
currentEditSeqId = key.getLogSeqNum();
currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
key.getOrigLogSeqNum() : currentEditSeqId;
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
@ -6243,7 +6243,7 @@ public class HRegion implements HeapSize { // , Writable{
WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
return key;
}
/**
* Explictly sync wal
* @throws IOException

View File

@ -26,8 +26,10 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@ -140,6 +142,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
@ -640,25 +643,37 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private OperationStatus [] doReplayBatchOp(final HRegion region,
final List<HLogSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
int i = 0;
for (HLogSplitter.MutationReplay m : mutations) {
for (Iterator<HLogSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
HLogSplitter.MutationReplay m = it.next();
if (m.type == MutationType.PUT) {
batchContainsPuts = true;
} else {
batchContainsDelete = true;
}
mArray[i++] = m;
NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
if (metaCells != null && !metaCells.isEmpty()) {
for (Cell metaCell : metaCells) {
CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
if (compactionDesc != null) {
region.completeCompactionMarker(compactionDesc);
}
}
it.remove();
}
}
requestCount.add(mutations.size());
if (!region.getRegionInfo().isMetaTable()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
return region.batchReplay(mArray, replaySeqId);
return region.batchReplay(mutations.toArray(
new HLogSplitter.MutationReplay[mutations.size()]), replaySeqId);
} finally {
if (regionServer.metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTimeMillis();
@ -1355,7 +1370,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
walEntries.add(walEntry);
}
if(edits!=null && !edits.isEmpty()) {
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
// check if it's a partial success
@ -1366,7 +1381,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
}
//sync wal at the end because ASYNC_WAL is used above
region.syncWal();

View File

@ -80,7 +80,6 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@ -92,6 +91,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
@ -241,7 +241,7 @@ public class HLogSplitter {
List<Path> splits = new ArrayList<Path>();
if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null,
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null,
RecoveryMode.LOG_SPLITTING);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
@ -811,6 +811,7 @@ public class HLogSplitter {
k.internEncodedRegionName(this.encodedRegionName);
}
@Override
public long heapSize() {
return heapInBuffer;
}
@ -825,6 +826,7 @@ public class HLogSplitter {
outputSink = sink;
}
@Override
public void run() {
try {
doRun();
@ -1060,6 +1062,7 @@ public class HLogSplitter {
TimeUnit.SECONDS, new ThreadFactory() {
private int count = 1;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "split-log-closeStream-" + count++);
return t;
@ -1070,6 +1073,7 @@ public class HLogSplitter {
for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
LOG.debug("Closing " + wap.p);
@ -1242,6 +1246,7 @@ public class HLogSplitter {
return (new WriterAndPath(regionedits, w));
}
@Override
void append(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) {
@ -1280,6 +1285,7 @@ public class HLogSplitter {
/**
* @return a map from encoded region ID to the number of edits written out for that region.
*/
@Override
Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
synchronized (writers) {
@ -1368,6 +1374,7 @@ public class HLogSplitter {
this.logRecoveredEditsOutputSink.setReporter(reporter);
}
@Override
void append(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) {
@ -1449,19 +1456,40 @@ public class HLogSplitter {
HConnection hconn = this.getConnectionByTableName(table);
for (KeyValue kv : kvs) {
// filtering HLog meta entries
// We don't handle HBASE-2231 because we may or may not replay a compaction event.
// Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143&
// page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143
byte[] row = kv.getRow();
byte[] family = kv.getFamily();
boolean isCompactionEntry = false;
if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) {
skippedKVs.add(kv);
continue;
CompactionDescriptor compaction = WALEdit.getCompaction(kv);
if (compaction != null && compaction.hasRegionName()) {
try {
byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
.toByteArray());
row = regionName[1]; // startKey of the region
family = compaction.getFamilyName().toByteArray();
isCompactionEntry = true;
} catch (Exception ex) {
LOG.warn("Unexpected exception received, ignoring " + ex);
skippedKVs.add(kv);
continue;
}
} else {
skippedKVs.add(kv);
continue;
}
}
try {
loc =
locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(),
locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
encodeRegionNameStr);
// skip replaying the compaction if the region is gone
if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
loc.getRegionInfo().getEncodedName())) {
LOG.info("Not replaying a compaction marker for an older region: "
+ encodeRegionNameStr);
needSkip = true;
}
} catch (TableNotFoundException ex) {
// table has been deleted so skip edits of the table
LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
@ -1490,7 +1518,7 @@ public class HLogSplitter {
regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
}
if (maxStoreSequenceIds != null) {
Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily());
Long maxStoreSeqId = maxStoreSequenceIds.get(family);
if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
// skip current kv if column family doesn't exist anymore or already flushed
skippedKVs.add(kv);
@ -1768,6 +1796,7 @@ public class HLogSplitter {
return result;
}
@Override
Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
synchronized (writers) {
@ -1922,7 +1951,7 @@ public class HLogSplitter {
return new ArrayList<MutationReplay>();
}
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
int count = entry.getAssociatedCellCount();
List<MutationReplay> mutations = new ArrayList<MutationReplay>();
@ -1979,7 +2008,7 @@ public class HLogSplitter {
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
}
key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
.getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
.getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
walKey.getNonceGroup(), walKey.getNonce());
logEntry.setFirst(key);
logEntry.setSecond(val);

View File

@ -262,7 +262,7 @@ public class HLogUtil {
final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
TableName tn = TableName.valueOf(c.getTableName().toByteArray());
HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false, null);
log.appendNoSync(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false, null);
log.sync();
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));

View File

@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
@ -145,6 +146,7 @@ public class WALEdit implements Writable, HeapSize {
return result;
}
@Override
public void readFields(DataInput in) throws IOException {
kvs.clear();
if (scopes != null) {
@ -180,6 +182,7 @@ public class WALEdit implements Writable, HeapSize {
}
}
@Override
public void write(DataOutput out) throws IOException {
LOG.warn("WALEdit is being serialized to writable - only expected in test code");
out.writeInt(VERSION_2);
@ -222,6 +225,7 @@ public class WALEdit implements Writable, HeapSize {
return kvs.size();
}
@Override
public long heapSize() {
long ret = ClassSize.ARRAYLIST;
for (KeyValue kv : kvs) {
@ -235,6 +239,7 @@ public class WALEdit implements Writable, HeapSize {
return ret;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -249,25 +254,36 @@ public class WALEdit implements Writable, HeapSize {
sb.append(">]");
return sb.toString();
}
/**
* Create a compacion WALEdit
* @param c
* @return A WALEdit that has <code>c</code> serialized as its value
*/
public static WALEdit createCompaction(final CompactionDescriptor c) {
public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) {
byte [] pbbytes = c.toByteArray();
KeyValue kv = new KeyValue(METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes);
KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION,
System.currentTimeMillis(), pbbytes);
return new WALEdit().add(kv); //replication scope null so that this won't be replicated
}
private static byte[] getRowForRegion(HRegionInfo hri) {
byte[] startKey = hri.getStartKey();
if (startKey.length == 0) {
// empty row key is not allowed in mutations because it is both the start key and the end key
// we return the smallest byte[] that is bigger (in lex comparison) than byte[0].
return new byte[] {0};
}
return startKey;
}
/**
* Deserialized and returns a CompactionDescriptor is the KeyValue contains one.
* @param kv the key value
* @return deserialized CompactionDescriptor or null.
*/
public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException {
if (CellUtil.matchingRow(kv, METAROW) && CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) {
public static CompactionDescriptor getCompaction(Cell kv) throws IOException {
if (CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) {
return CompactionDescriptor.parseFrom(kv.getValue());
}
return null;

View File

@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@ -234,8 +233,6 @@ public class WALEditsReplaySink {
List<KeyValue> kvs = edit.getKeyValues();
for (KeyValue kv : kvs) {
// filtering HLog meta entries
if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) continue;
setLocation(conn.locateRegion(tableName, kv.getRow()));
skip = true;
break;

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hdfs.DFSClient;
@ -52,6 +56,8 @@ import org.apache.log4j.Level;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
/**
* Test for the case where a regionserver going down has enough cycles to do damage to regions
* that have actually been assigned elsehwere.
@ -215,7 +221,8 @@ public class TestIOFencing {
*/
@Test
public void testFencingAroundCompaction() throws Exception {
doTest(BlockCompactionsInPrepRegion.class);
doTest(BlockCompactionsInPrepRegion.class, false);
doTest(BlockCompactionsInPrepRegion.class, true);
}
/**
@ -226,12 +233,13 @@ public class TestIOFencing {
*/
@Test
public void testFencingAroundCompactionAfterWALSync() throws Exception {
doTest(BlockCompactionsInCompletionRegion.class);
doTest(BlockCompactionsInCompletionRegion.class, false);
doTest(BlockCompactionsInCompletionRegion.class, true);
}
public void doTest(Class<?> regionClass) throws Exception {
public void doTest(Class<?> regionClass, boolean distributedLogReplay) throws Exception {
Configuration c = TEST_UTIL.getConfiguration();
c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
// Insert our custom region
c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
c.setBoolean("dfs.support.append", true);
@ -264,6 +272,16 @@ public class TestIOFencing {
// Load some rows
TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
// add a compaction from an older (non-existing) region to see whether we successfully skip
// those entries
HRegionInfo oldHri = new HRegionInfo(table.getName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(),
oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
// Wait till flush has happened, otherwise there won't be multiple store files
long startWaitTime = System.currentTimeMillis();
while (compactingRegion.getLastFlushTime() <= lastFlushTime ||
@ -281,18 +299,24 @@ public class TestIOFencing {
compactingRegion.waitForCompactionToBlock();
LOG.info("Starting a new server");
RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
HRegionServer newServer = newServerThread.getRegionServer();
final HRegionServer newServer = newServerThread.getRegionServer();
LOG.info("Killing region server ZK lease");
TEST_UTIL.expireRegionServerSession(0);
CompactionBlockerRegion newRegion = null;
startWaitTime = System.currentTimeMillis();
while (newRegion == null) {
LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
Thread.sleep(1000);
newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
assertTrue("Timed out waiting for new server to open region",
System.currentTimeMillis() - startWaitTime < 300000);
}
LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
// wait for region to be assigned and to go out of log replay if applicable
Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
HRegion newRegion = newServer.getOnlineRegion(REGION_NAME);
return newRegion != null && !newRegion.isRecovering();
}
});
newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
LOG.info("Allowing compaction to proceed");
compactingRegion.allowCompactions();
while (compactingRegion.compactCount == 0) {

View File

@ -657,11 +657,13 @@ public class TestHRegion {
long time = System.nanoTime();
WALEdit edit = null;
if (i == maxSeqId) {
edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder()
edit = WALEdit.createCompaction(region.getRegionInfo(),
CompactionDescriptor.newBuilder()
.setTableName(ByteString.copyFrom(tableName.getName()))
.setFamilyName(ByteString.copyFrom(regionName))
.setEncodedRegionName(ByteString.copyFrom(regionName))
.setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
.setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
.build());
} else {
edit = new WALEdit();
@ -753,7 +755,8 @@ public class TestHRegion {
long time = System.nanoTime();
writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time,
HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(compactionDescriptor)));
HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
compactionDescriptor)));
writer.close();
// close the region now, and reopen again

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
@ -491,10 +492,12 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testCompactionWALEdits() throws Exception {
WALProtos.CompactionDescriptor compactionDescriptor =
WALProtos.CompactionDescriptor.getDefaultInstance();
WALEdit edit = WALEdit.createCompaction(compactionDescriptor);
HRegionInfo hri = new HRegionInfo(htable1.getName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
Replication.scopeWALEdits(htable1.getTableDescriptor(), new HLogKey(), edit);
}
/**
* Test for HBASE-8663
* Create two new Tables with colfamilies enabled for replication then run