From c29355ecd7bd1f2fd2a37509560204ceaa29f888 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sun, 26 Feb 2023 16:14:25 +0800 Subject: [PATCH] HBASE-27644 Should not return false when WALKey has no following KVs while reading WAL file (#5032) Signed-off-by: Viraj Jasani (cherry picked from commit 4a9cf99b2f8cd2fa646a5c43280e2d91d2dccafd) Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java --- .../regionserver/wal/ProtobufLogReader.java | 5 +- .../apache/hadoop/hbase/wal/TestWALSplit.java | 86 ++++++++++++++----- 2 files changed, 67 insertions(+), 24 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 2e3fb4f5b5f..d562fe705ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -408,10 +408,9 @@ public class ProtobufLogReader extends ReaderBase { WALKey walKey = builder.build(); entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { - LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}", + LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}", this.inputStream.getPos()); - seekOnFs(originalPosition); - return false; + return true; } int expectedCells = walKey.getFollowingKvCount(); long posBefore = this.inputStream.getPos(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index c14c589a919..fec532f89da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -270,7 +270,7 @@ public class TestWALSplit { * log entry. Does its writing as an alternate user in another filesystem instance to simulate * better it being a regionserver. */ - class ZombieLastLogWriterRegionServer extends Thread { + private class ZombieLastLogWriterRegionServer extends Thread { final AtomicLong editsCount; final AtomicBoolean stop; final int numOfWriters; @@ -396,10 +396,6 @@ public class TestWALSplit { private Path createRecoveredEditsPathForRegion() throws IOException { byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); - long now = EnvironmentEdgeManager.currentTime(); - Entry entry = new Entry( - new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), - new WALEdit()); Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT, TMPDIRNAME, conf); return p; @@ -1167,7 +1163,44 @@ public class TestWALSplit { } finally { conf.unset(HConstants.WAL_STORAGE_POLICY); } + } + /** + * See HBASE-27644, typically we should not have empty WALEdit but we should be able to process + * it, instead of losing data after it. + */ + @Test + public void testEmptyWALEdit() throws IOException { + final String region = "region__5"; + REGIONS.clear(); + REGIONS.add(region); + makeRegionDirs(REGIONS); + fs.mkdirs(WALDIR); + Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5); + generateEmptyEditWAL(path, Bytes.toBytes(region)); + useDifferentDFSClient(); + + Path regiondir = new Path(TABLEDIR, region); + fs.mkdirs(regiondir); + List splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + // Make sure that WALSplitter generate the split file + assertEquals(1, splitPaths.size()); + + Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath(); + assertEquals(11, countWAL(originalLog)); + // we will skip the empty WAL when splitting + assertEquals(10, countWAL(splitPaths.get(0))); + } + + private void generateEmptyEditWAL(Path path, byte[] region) throws IOException { + fs.mkdirs(WALDIR); + try (Writer writer = wals.createWALWriter(fs, path)) { + long seq = 0; + appendEmptyEntry(writer, TABLE_NAME, region, seq++); + for (int i = 0; i < 10; i++) { + appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++); + } + } } private Writer generateWALs(int leaveOpen) throws IOException { @@ -1360,7 +1393,7 @@ public class TestWALSplit { w.sync(false); } - public static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row, + private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row, byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException { LOG.info(Thread.currentThread().getName() + " append"); writer.append(createTestEntry(table, region, row, family, qualifier, value, seq)); @@ -1373,13 +1406,27 @@ public class TestWALSplit { byte[] qualifier, byte[] value, long seq) { long time = System.nanoTime(); - seq++; final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); WALEdit edit = new WALEdit(); edit.add(cell); return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit); } + private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq) + throws IOException { + LOG.info(Thread.currentThread().getName() + " append"); + writer.append(createEmptyEntry(table, region, seq)); + LOG.info(Thread.currentThread().getName() + " sync"); + writer.sync(false); + return seq; + } + + private static Entry createEmptyEntry(TableName table, byte[] region, long seq) { + long time = System.nanoTime(); + return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), + new WALEdit()); + } + private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { Writer writer = WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); @@ -1389,22 +1436,19 @@ public class TestWALSplit { } private boolean logsAreEqual(Path p1, Path p2) throws IOException { - Reader in1, in2; - in1 = wals.createReader(fs, p1); - in2 = wals.createReader(fs, p2); - Entry entry1; - Entry entry2; - while ((entry1 = in1.next()) != null) { - entry2 = in2.next(); - if ( - (entry1.getKey().compareTo(entry2.getKey()) != 0) - || (!entry1.getEdit().toString().equals(entry2.getEdit().toString())) - ) { - return false; + try (Reader in1 = wals.createReader(fs, p1); Reader in2 = wals.createReader(fs, p2)) { + Entry entry1; + Entry entry2; + while ((entry1 = in1.next()) != null) { + entry2 = in2.next(); + if ( + (entry1.getKey().compareTo(entry2.getKey()) != 0) + || (!entry1.getEdit().toString().equals(entry2.getEdit().toString())) + ) { + return false; + } } } - in1.close(); - in2.close(); return true; } }