HBASE-27644 Should not return false when WALKey has no following KVs while reading WAL file (#5032)
Signed-off-by: Viraj Jasani <vjasani@apache.org> (cherry picked from commit 4a9cf99b2f8cd2fa646a5c43280e2d91d2dccafd) Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
This commit is contained in:
parent
c49b5e7acc
commit
22093cb442
@ -386,10 +386,9 @@ public class ProtobufLogReader extends ReaderBase {
|
|||||||
WALKey walKey = builder.build();
|
WALKey walKey = builder.build();
|
||||||
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
||||||
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
||||||
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
|
LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
|
||||||
this.inputStream.getPos());
|
this.inputStream.getPos());
|
||||||
seekOnFs(originalPosition);
|
return true;
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
int expectedCells = walKey.getFollowingKvCount();
|
int expectedCells = walKey.getFollowingKvCount();
|
||||||
long posBefore = this.inputStream.getPos();
|
long posBefore = this.inputStream.getPos();
|
||||||
|
@ -270,7 +270,7 @@ public class TestWALSplit {
|
|||||||
* log entry. Does its writing as an alternate user in another filesystem instance to simulate
|
* log entry. Does its writing as an alternate user in another filesystem instance to simulate
|
||||||
* better it being a regionserver.
|
* better it being a regionserver.
|
||||||
*/
|
*/
|
||||||
class ZombieLastLogWriterRegionServer extends Thread {
|
private class ZombieLastLogWriterRegionServer extends Thread {
|
||||||
final AtomicLong editsCount;
|
final AtomicLong editsCount;
|
||||||
final AtomicBoolean stop;
|
final AtomicBoolean stop;
|
||||||
final int numOfWriters;
|
final int numOfWriters;
|
||||||
@ -396,10 +396,6 @@ public class TestWALSplit {
|
|||||||
|
|
||||||
private Path createRecoveredEditsPathForRegion() throws IOException {
|
private Path createRecoveredEditsPathForRegion() throws IOException {
|
||||||
byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
|
byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
Entry entry = new Entry(
|
|
||||||
new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
|
|
||||||
new WALEdit());
|
|
||||||
Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
|
Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
|
||||||
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
|
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
|
||||||
return p;
|
return p;
|
||||||
@ -1167,7 +1163,44 @@ public class TestWALSplit {
|
|||||||
} finally {
|
} finally {
|
||||||
conf.unset(HConstants.WAL_STORAGE_POLICY);
|
conf.unset(HConstants.WAL_STORAGE_POLICY);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See HBASE-27644, typically we should not have empty WALEdit but we should be able to process
|
||||||
|
* it, instead of losing data after it.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testEmptyWALEdit() throws IOException {
|
||||||
|
final String region = "region__5";
|
||||||
|
REGIONS.clear();
|
||||||
|
REGIONS.add(region);
|
||||||
|
makeRegionDirs(REGIONS);
|
||||||
|
fs.mkdirs(WALDIR);
|
||||||
|
Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5);
|
||||||
|
generateEmptyEditWAL(path, Bytes.toBytes(region));
|
||||||
|
useDifferentDFSClient();
|
||||||
|
|
||||||
|
Path regiondir = new Path(TABLEDIR, region);
|
||||||
|
fs.mkdirs(regiondir);
|
||||||
|
List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
|
||||||
|
// Make sure that WALSplitter generate the split file
|
||||||
|
assertEquals(1, splitPaths.size());
|
||||||
|
|
||||||
|
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
|
||||||
|
assertEquals(11, countWAL(originalLog));
|
||||||
|
// we will skip the empty WAL when splitting
|
||||||
|
assertEquals(10, countWAL(splitPaths.get(0)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void generateEmptyEditWAL(Path path, byte[] region) throws IOException {
|
||||||
|
fs.mkdirs(WALDIR);
|
||||||
|
try (Writer writer = wals.createWALWriter(fs, path)) {
|
||||||
|
long seq = 0;
|
||||||
|
appendEmptyEntry(writer, TABLE_NAME, region, seq++);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Writer generateWALs(int leaveOpen) throws IOException {
|
private Writer generateWALs(int leaveOpen) throws IOException {
|
||||||
@ -1360,7 +1393,7 @@ public class TestWALSplit {
|
|||||||
w.sync(false);
|
w.sync(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
|
private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
|
||||||
byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException {
|
byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException {
|
||||||
LOG.info(Thread.currentThread().getName() + " append");
|
LOG.info(Thread.currentThread().getName() + " append");
|
||||||
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
|
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
|
||||||
@ -1373,13 +1406,27 @@ public class TestWALSplit {
|
|||||||
byte[] qualifier, byte[] value, long seq) {
|
byte[] qualifier, byte[] value, long seq) {
|
||||||
long time = System.nanoTime();
|
long time = System.nanoTime();
|
||||||
|
|
||||||
seq++;
|
|
||||||
final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
|
final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
|
||||||
WALEdit edit = new WALEdit();
|
WALEdit edit = new WALEdit();
|
||||||
edit.add(cell);
|
edit.add(cell);
|
||||||
return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit);
|
return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq)
|
||||||
|
throws IOException {
|
||||||
|
LOG.info(Thread.currentThread().getName() + " append");
|
||||||
|
writer.append(createEmptyEntry(table, region, seq));
|
||||||
|
LOG.info(Thread.currentThread().getName() + " sync");
|
||||||
|
writer.sync(false);
|
||||||
|
return seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Entry createEmptyEntry(TableName table, byte[] region, long seq) {
|
||||||
|
long time = System.nanoTime();
|
||||||
|
return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID),
|
||||||
|
new WALEdit());
|
||||||
|
}
|
||||||
|
|
||||||
private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
|
private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
|
||||||
Writer writer =
|
Writer writer =
|
||||||
WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
|
WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
|
||||||
@ -1389,22 +1436,19 @@ public class TestWALSplit {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean logsAreEqual(Path p1, Path p2) throws IOException {
|
private boolean logsAreEqual(Path p1, Path p2) throws IOException {
|
||||||
Reader in1, in2;
|
try (Reader in1 = wals.createReader(fs, p1); Reader in2 = wals.createReader(fs, p2)) {
|
||||||
in1 = wals.createReader(fs, p1);
|
Entry entry1;
|
||||||
in2 = wals.createReader(fs, p2);
|
Entry entry2;
|
||||||
Entry entry1;
|
while ((entry1 = in1.next()) != null) {
|
||||||
Entry entry2;
|
entry2 = in2.next();
|
||||||
while ((entry1 = in1.next()) != null) {
|
if (
|
||||||
entry2 = in2.next();
|
(entry1.getKey().compareTo(entry2.getKey()) != 0)
|
||||||
if (
|
|| (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
|
||||||
(entry1.getKey().compareTo(entry2.getKey()) != 0)
|
) {
|
||||||
|| (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
|
return false;
|
||||||
) {
|
}
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
in1.close();
|
|
||||||
in2.close();
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user