HBASE-27644 Should not return false when WALKey has no following KVs while reading WAL file (#5032)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
4b7815d552
commit
4a9cf99b2f
|
@ -408,10 +408,9 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
WALKey walKey = builder.build();
|
||||
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
||||
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
||||
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
|
||||
LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
|
||||
this.inputStream.getPos());
|
||||
seekOnFs(originalPosition);
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
int expectedCells = walKey.getFollowingKvCount();
|
||||
long posBefore = this.inputStream.getPos();
|
||||
|
|
|
@ -276,7 +276,7 @@ public class TestWALSplit {
|
|||
* log entry. Does its writing as an alternate user in another filesystem instance to simulate
|
||||
* better it being a regionserver.
|
||||
*/
|
||||
class ZombieLastLogWriterRegionServer extends Thread {
|
||||
private class ZombieLastLogWriterRegionServer extends Thread {
|
||||
final AtomicLong editsCount;
|
||||
final AtomicBoolean stop;
|
||||
final int numOfWriters;
|
||||
|
@ -402,10 +402,6 @@ public class TestWALSplit {
|
|||
|
||||
private Path createRecoveredEditsPathForRegion() throws IOException {
|
||||
byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
Entry entry = new Entry(
|
||||
new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
|
||||
new WALEdit());
|
||||
Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
|
||||
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
|
||||
return p;
|
||||
|
@ -491,10 +487,10 @@ public class TestWALSplit {
|
|||
assertEquals(11, countWAL(splitLog[0]));
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Tests that WalSplitter ignores replication marker edits.
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
@Test
|
||||
public void testSplitRemovesReplicationMarkerEdits() throws IOException {
|
||||
RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO;
|
||||
Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1");
|
||||
|
@ -1206,7 +1202,44 @@ public class TestWALSplit {
|
|||
} finally {
|
||||
conf.unset(HConstants.WAL_STORAGE_POLICY);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* See HBASE-27644, typically we should not have empty WALEdit but we should be able to process
|
||||
* it, instead of losing data after it.
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyWALEdit() throws IOException {
|
||||
final String region = "region__5";
|
||||
REGIONS.clear();
|
||||
REGIONS.add(region);
|
||||
makeRegionDirs(REGIONS);
|
||||
fs.mkdirs(WALDIR);
|
||||
Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5);
|
||||
generateEmptyEditWAL(path, Bytes.toBytes(region));
|
||||
useDifferentDFSClient();
|
||||
|
||||
Path regiondir = new Path(TABLEDIR, region);
|
||||
fs.mkdirs(regiondir);
|
||||
List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
|
||||
// Make sure that WALSplitter generate the split file
|
||||
assertEquals(1, splitPaths.size());
|
||||
|
||||
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
|
||||
assertEquals(11, countWAL(originalLog));
|
||||
// we will skip the empty WAL when splitting
|
||||
assertEquals(10, countWAL(splitPaths.get(0)));
|
||||
}
|
||||
|
||||
private void generateEmptyEditWAL(Path path, byte[] region) throws IOException {
|
||||
fs.mkdirs(WALDIR);
|
||||
try (Writer writer = wals.createWALWriter(fs, path)) {
|
||||
long seq = 0;
|
||||
appendEmptyEntry(writer, TABLE_NAME, region, seq++);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Writer generateWALs(int leaveOpen) throws IOException {
|
||||
|
@ -1399,7 +1432,7 @@ public class TestWALSplit {
|
|||
w.sync(false);
|
||||
}
|
||||
|
||||
public static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
|
||||
private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
|
||||
byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException {
|
||||
LOG.info(Thread.currentThread().getName() + " append");
|
||||
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
|
||||
|
@ -1412,13 +1445,27 @@ public class TestWALSplit {
|
|||
byte[] qualifier, byte[] value, long seq) {
|
||||
long time = System.nanoTime();
|
||||
|
||||
seq++;
|
||||
final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
|
||||
WALEdit edit = new WALEdit();
|
||||
edit.add(cell);
|
||||
return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit);
|
||||
}
|
||||
|
||||
private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq)
|
||||
throws IOException {
|
||||
LOG.info(Thread.currentThread().getName() + " append");
|
||||
writer.append(createEmptyEntry(table, region, seq));
|
||||
LOG.info(Thread.currentThread().getName() + " sync");
|
||||
writer.sync(false);
|
||||
return seq;
|
||||
}
|
||||
|
||||
private static Entry createEmptyEntry(TableName table, byte[] region, long seq) {
|
||||
long time = System.nanoTime();
|
||||
return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID),
|
||||
new WALEdit());
|
||||
}
|
||||
|
||||
private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
|
||||
Writer writer =
|
||||
WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
|
||||
|
@ -1428,22 +1475,19 @@ public class TestWALSplit {
|
|||
}
|
||||
|
||||
private boolean logsAreEqual(Path p1, Path p2) throws IOException {
|
||||
Reader in1, in2;
|
||||
in1 = wals.createReader(fs, p1);
|
||||
in2 = wals.createReader(fs, p2);
|
||||
Entry entry1;
|
||||
Entry entry2;
|
||||
while ((entry1 = in1.next()) != null) {
|
||||
entry2 = in2.next();
|
||||
if (
|
||||
(entry1.getKey().compareTo(entry2.getKey()) != 0)
|
||||
|| (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
|
||||
) {
|
||||
return false;
|
||||
try (Reader in1 = wals.createReader(fs, p1); Reader in2 = wals.createReader(fs, p2)) {
|
||||
Entry entry1;
|
||||
Entry entry2;
|
||||
while ((entry1 = in1.next()) != null) {
|
||||
entry2 = in2.next();
|
||||
if (
|
||||
(entry1.getKey().compareTo(entry2.getKey()) != 0)
|
||||
|| (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
in1.close();
|
||||
in2.close();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue