HBASE-27644 Should not return false when WALKey has no following KVs while reading WAL file (#5032)
Signed-off-by: Viraj Jasani <vjasani@apache.org>
(cherry picked from commit 4a9cf99b2f
)
This commit is contained in:
parent
a6ca9cb61e
commit
82c4ccefd3
|
@ -408,10 +408,9 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
WALKey walKey = builder.build();
|
WALKey walKey = builder.build();
|
||||||
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
||||||
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
||||||
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
|
LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
|
||||||
this.inputStream.getPos());
|
this.inputStream.getPos());
|
||||||
seekOnFs(originalPosition);
|
return true;
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
int expectedCells = walKey.getFollowingKvCount();
|
int expectedCells = walKey.getFollowingKvCount();
|
||||||
long posBefore = this.inputStream.getPos();
|
long posBefore = this.inputStream.getPos();
|
||||||
|
|
|
@ -276,7 +276,7 @@ public class TestWALSplit {
|
||||||
* log entry. Does its writing as an alternate user in another filesystem instance to simulate
|
* log entry. Does its writing as an alternate user in another filesystem instance to simulate
|
||||||
* better it being a regionserver.
|
* better it being a regionserver.
|
||||||
*/
|
*/
|
||||||
class ZombieLastLogWriterRegionServer extends Thread {
|
private class ZombieLastLogWriterRegionServer extends Thread {
|
||||||
final AtomicLong editsCount;
|
final AtomicLong editsCount;
|
||||||
final AtomicBoolean stop;
|
final AtomicBoolean stop;
|
||||||
final int numOfWriters;
|
final int numOfWriters;
|
||||||
|
@ -402,10 +402,6 @@ public class TestWALSplit {
|
||||||
|
|
||||||
private Path createRecoveredEditsPathForRegion() throws IOException {
|
private Path createRecoveredEditsPathForRegion() throws IOException {
|
||||||
byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
|
byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
Entry entry = new Entry(
|
|
||||||
new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
|
|
||||||
new WALEdit());
|
|
||||||
Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
|
Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
|
||||||
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
|
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
|
||||||
return p;
|
return p;
|
||||||
|
@ -491,10 +487,10 @@ public class TestWALSplit {
|
||||||
assertEquals(11, countWAL(splitLog[0]));
|
assertEquals(11, countWAL(splitLog[0]));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Tests that WalSplitter ignores replication marker edits.
|
* Tests that WalSplitter ignores replication marker edits.
|
||||||
*/
|
*/
|
||||||
@Test(timeout = 30000)
|
@Test
|
||||||
public void testSplitRemovesReplicationMarkerEdits() throws IOException {
|
public void testSplitRemovesReplicationMarkerEdits() throws IOException {
|
||||||
RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO;
|
RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO;
|
||||||
Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1");
|
Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1");
|
||||||
|
@ -1206,7 +1202,44 @@ public class TestWALSplit {
|
||||||
} finally {
|
} finally {
|
||||||
conf.unset(HConstants.WAL_STORAGE_POLICY);
|
conf.unset(HConstants.WAL_STORAGE_POLICY);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See HBASE-27644, typically we should not have empty WALEdit but we should be able to process
|
||||||
|
* it, instead of losing data after it.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testEmptyWALEdit() throws IOException {
|
||||||
|
final String region = "region__5";
|
||||||
|
REGIONS.clear();
|
||||||
|
REGIONS.add(region);
|
||||||
|
makeRegionDirs(REGIONS);
|
||||||
|
fs.mkdirs(WALDIR);
|
||||||
|
Path path = new Path(WALDIR, WAL_FILE_PREFIX + 5);
|
||||||
|
generateEmptyEditWAL(path, Bytes.toBytes(region));
|
||||||
|
useDifferentDFSClient();
|
||||||
|
|
||||||
|
Path regiondir = new Path(TABLEDIR, region);
|
||||||
|
fs.mkdirs(regiondir);
|
||||||
|
List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
|
||||||
|
// Make sure that WALSplitter generate the split file
|
||||||
|
assertEquals(1, splitPaths.size());
|
||||||
|
|
||||||
|
Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
|
||||||
|
assertEquals(11, countWAL(originalLog));
|
||||||
|
// we will skip the empty WAL when splitting
|
||||||
|
assertEquals(10, countWAL(splitPaths.get(0)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void generateEmptyEditWAL(Path path, byte[] region) throws IOException {
|
||||||
|
fs.mkdirs(WALDIR);
|
||||||
|
try (Writer writer = wals.createWALWriter(fs, path)) {
|
||||||
|
long seq = 0;
|
||||||
|
appendEmptyEntry(writer, TABLE_NAME, region, seq++);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
appendEntry(writer, TABLE_NAME, region, Bytes.toBytes(i), FAMILY, QUALIFIER, VALUE, seq++);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Writer generateWALs(int leaveOpen) throws IOException {
|
private Writer generateWALs(int leaveOpen) throws IOException {
|
||||||
|
@ -1399,7 +1432,7 @@ public class TestWALSplit {
|
||||||
w.sync(false);
|
w.sync(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
|
private static long appendEntry(Writer writer, TableName table, byte[] region, byte[] row,
|
||||||
byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException {
|
byte[] family, byte[] qualifier, byte[] value, long seq) throws IOException {
|
||||||
LOG.info(Thread.currentThread().getName() + " append");
|
LOG.info(Thread.currentThread().getName() + " append");
|
||||||
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
|
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
|
||||||
|
@ -1412,13 +1445,27 @@ public class TestWALSplit {
|
||||||
byte[] qualifier, byte[] value, long seq) {
|
byte[] qualifier, byte[] value, long seq) {
|
||||||
long time = System.nanoTime();
|
long time = System.nanoTime();
|
||||||
|
|
||||||
seq++;
|
|
||||||
final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
|
final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
|
||||||
WALEdit edit = new WALEdit();
|
WALEdit edit = new WALEdit();
|
||||||
edit.add(cell);
|
edit.add(cell);
|
||||||
return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit);
|
return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static long appendEmptyEntry(Writer writer, TableName table, byte[] region, long seq)
|
||||||
|
throws IOException {
|
||||||
|
LOG.info(Thread.currentThread().getName() + " append");
|
||||||
|
writer.append(createEmptyEntry(table, region, seq));
|
||||||
|
LOG.info(Thread.currentThread().getName() + " sync");
|
||||||
|
writer.sync(false);
|
||||||
|
return seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Entry createEmptyEntry(TableName table, byte[] region, long seq) {
|
||||||
|
long time = System.nanoTime();
|
||||||
|
return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID),
|
||||||
|
new WALEdit());
|
||||||
|
}
|
||||||
|
|
||||||
private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
|
private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
|
||||||
Writer writer =
|
Writer writer =
|
||||||
WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
|
WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
|
||||||
|
@ -1428,22 +1475,19 @@ public class TestWALSplit {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean logsAreEqual(Path p1, Path p2) throws IOException {
|
private boolean logsAreEqual(Path p1, Path p2) throws IOException {
|
||||||
Reader in1, in2;
|
try (Reader in1 = wals.createReader(fs, p1); Reader in2 = wals.createReader(fs, p2)) {
|
||||||
in1 = wals.createReader(fs, p1);
|
Entry entry1;
|
||||||
in2 = wals.createReader(fs, p2);
|
Entry entry2;
|
||||||
Entry entry1;
|
while ((entry1 = in1.next()) != null) {
|
||||||
Entry entry2;
|
entry2 = in2.next();
|
||||||
while ((entry1 = in1.next()) != null) {
|
if (
|
||||||
entry2 = in2.next();
|
(entry1.getKey().compareTo(entry2.getKey()) != 0)
|
||||||
if (
|
|| (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
|
||||||
(entry1.getKey().compareTo(entry2.getKey()) != 0)
|
) {
|
||||||
|| (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))
|
return false;
|
||||||
) {
|
}
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
in1.close();
|
|
||||||
in2.close();
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue