HBASE-16754 All WALSplitter OutputSinks should process compaction events
This commit is contained in:
parent
5bc518b387
commit
317136e272
|
@ -363,12 +363,21 @@ public class WALEdit implements Writable, HeapSize {
|
|||
* @return deserialized CompactionDescriptor or null.
|
||||
*/
|
||||
public static CompactionDescriptor getCompaction(Cell kv) throws IOException {
|
||||
if (CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) {
|
||||
if (isCompactionMarker(kv)) {
|
||||
return CompactionDescriptor.parseFrom(CellUtil.cloneValue(kv));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the given cell is a serialized {@link CompactionDescriptor}
|
||||
*
|
||||
* @see #getCompaction(Cell)
|
||||
*/
|
||||
public static boolean isCompactionMarker(Cell cell) {
|
||||
return CellUtil.matchingColumn(cell, METAFAMILY, COMPACTION);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a bulk loader WALEdit
|
||||
*
|
||||
|
|
|
@ -361,7 +361,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean keepRegionEvents() {
|
||||
public boolean keepRegionEvent(Entry entry) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -358,7 +358,7 @@ public class WALSplitter {
|
|||
continue;
|
||||
}
|
||||
// Don't send Compaction/Close/Open region events to recovered edit type sinks.
|
||||
if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvents()) {
|
||||
if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) {
|
||||
editsSkipped++;
|
||||
continue;
|
||||
}
|
||||
|
@ -1277,12 +1277,11 @@ public class WALSplitter {
|
|||
|
||||
/**
|
||||
* Some WALEdit's contain only KV's for account on what happened to a region.
|
||||
* Not all sinks will want to get those edits.
|
||||
* Not all sinks will want to get all of those edits.
|
||||
*
|
||||
* @return Return true if this sink wants to get all WALEdit's regardless of if it's a region
|
||||
* event.
|
||||
* @return Return true if this sink wants to accept this region-level WALEdit.
|
||||
*/
|
||||
public abstract boolean keepRegionEvents();
|
||||
public abstract boolean keepRegionEvent(Entry entry);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1627,7 +1626,13 @@ public class WALSplitter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean keepRegionEvents() {
|
||||
public boolean keepRegionEvent(Entry entry) {
|
||||
ArrayList<Cell> cells = entry.getEdit().getCells();
|
||||
for (int i = 0; i < cells.size(); i++) {
|
||||
if (WALEdit.isCompactionMarker(cells.get(i))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -2082,7 +2087,7 @@ public class WALSplitter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean keepRegionEvents() {
|
||||
public boolean keepRegionEvent(Entry entry) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.io.IOException;
|
|||
import java.lang.reflect.Method;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||
|
@ -449,6 +451,39 @@ public class TestWALSplit {
|
|||
assertEquals(1, splitLog.length);
|
||||
|
||||
assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
|
||||
// split log should only have the test edits
|
||||
assertEquals(10, countWAL(splitLog[0]));
|
||||
}
|
||||
|
||||
|
||||
@Test (timeout=300000)
|
||||
public void testSplitLeavesCompactionEventsEdits() throws IOException{
|
||||
HRegionInfo hri = new HRegionInfo(TABLE_NAME);
|
||||
REGIONS.clear();
|
||||
REGIONS.add(hri.getEncodedName());
|
||||
Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
|
||||
LOG.info("Creating region directory: " + regionDir);
|
||||
assertTrue(fs.mkdirs(regionDir));
|
||||
|
||||
Writer writer = generateWALs(1, 10, 0, 10);
|
||||
String[] compactInputs = new String[]{"file1", "file2", "file3"};
|
||||
String compactOutput = "file4";
|
||||
appendCompactionEvent(writer, hri, compactInputs, compactOutput);
|
||||
writer.close();
|
||||
|
||||
useDifferentDFSClient();
|
||||
WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
|
||||
|
||||
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
|
||||
// original log should have 10 test edits, 10 region markers, 1 compaction marker
|
||||
assertEquals(21, countWAL(originalLog));
|
||||
|
||||
Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, hri.getEncodedName());
|
||||
assertEquals(1, splitLog.length);
|
||||
|
||||
assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
|
||||
// split log should have 10 test edits plus 1 compaction marker
|
||||
assertEquals(11, countWAL(splitLog[0]));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1300,6 +1335,24 @@ public class TestWALSplit {
|
|||
return count;
|
||||
}
|
||||
|
||||
private static void appendCompactionEvent(Writer w, HRegionInfo hri, String[] inputs,
|
||||
String output) throws IOException {
|
||||
WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
|
||||
desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
|
||||
.setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
|
||||
.setRegionName(ByteString.copyFrom(hri.getRegionName()))
|
||||
.setFamilyName(ByteString.copyFrom(FAMILY))
|
||||
.setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
|
||||
.addAllCompactionInput(Arrays.asList(inputs))
|
||||
.addCompactionOutput(output);
|
||||
|
||||
WALEdit edit = WALEdit.createCompaction(hri, desc.build());
|
||||
WALKey key = new WALKey(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
|
||||
EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
|
||||
w.append(new Entry(key, edit));
|
||||
w.sync();
|
||||
}
|
||||
|
||||
private static void appendRegionEvent(Writer w, String region) throws IOException {
|
||||
WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
|
||||
WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
|
||||
|
@ -1315,6 +1368,7 @@ public class TestWALSplit {
|
|||
HConstants.DEFAULT_CLUSTER_ID);
|
||||
w.append(
|
||||
new Entry(walKey, new WALEdit().add(kv)));
|
||||
w.sync();
|
||||
}
|
||||
|
||||
public static long appendEntry(Writer writer, TableName table, byte[] region,
|
||||
|
|
Loading…
Reference in New Issue