HBASE-23587 The FSYNC_WAL flag does not work on branch-2.x (#974)
Signed-off-by: Guanghao Zhang <zghao@apache.org> Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
a693a8fd95
commit
26b1695df5
|
@ -347,13 +347,31 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
}
|
||||
}
|
||||
|
||||
// find all the sync futures between these two txids to see if we need to issue a hsync, if no
|
||||
// sync futures then just use the default one.
|
||||
private boolean isHsync(long beginTxid, long endTxid) {
|
||||
SortedSet<SyncFuture> futures =
|
||||
syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1));
|
||||
if (futures.isEmpty()) {
|
||||
return useHsync;
|
||||
}
|
||||
for (SyncFuture future : futures) {
|
||||
if (future.isForceSync()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void sync(AsyncWriter writer) {
|
||||
fileLengthAtLastSync = writer.getLength();
|
||||
long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
|
||||
boolean shouldUseHsync =
|
||||
isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
|
||||
highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
|
||||
final long startTimeNs = System.nanoTime();
|
||||
final long epoch = (long) epochAndState >>> 2L;
|
||||
addListener(writer.sync(useHsync), (result, error) -> {
|
||||
addListener(writer.sync(shouldUseHsync), (result, error) -> {
|
||||
if (error != null) {
|
||||
syncFailed(epoch, error);
|
||||
} else {
|
||||
|
|
|
@ -579,7 +579,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
Throwable lastException = null;
|
||||
try {
|
||||
TraceUtil.addTimelineAnnotation("syncing writer");
|
||||
writer.sync(useHsync);
|
||||
writer.sync(takeSyncFuture.isForceSync());
|
||||
TraceUtil.addTimelineAnnotation("writer synced");
|
||||
currentSequence = updateHighestSyncedSequence(currentSequence);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -25,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -72,11 +74,19 @@ public class TestAsyncFSWALDurability extends WALDurabilityTestBase<CustomAsyncF
|
|||
protected Boolean getSyncFlag(CustomAsyncFSWAL wal) {
|
||||
return wal.getSyncFlag();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Boolean getWriterSyncFlag(CustomAsyncFSWAL wal) {
|
||||
return wal.getWriterSyncFlag();
|
||||
}
|
||||
}
|
||||
|
||||
class CustomAsyncFSWAL extends AsyncFSWAL {
|
||||
|
||||
private Boolean syncFlag;
|
||||
|
||||
private Boolean writerSyncFlag;
|
||||
|
||||
public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf,
|
||||
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
|
||||
throws FailedLogCloseException, IOException {
|
||||
|
@ -84,6 +94,34 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
|
|||
eventLoopGroup, channelClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AsyncWriter createWriterInstance(Path path) throws IOException {
|
||||
AsyncWriter writer = super.createWriterInstance(path);
|
||||
return new AsyncWriter() {
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
writer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLength() {
|
||||
return writer.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Long> sync(boolean forceSync) {
|
||||
writerSyncFlag = forceSync;
|
||||
return writer.sync(forceSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(Entry entry) {
|
||||
writer.append(entry);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
syncFlag = forceSync;
|
||||
|
@ -98,9 +136,14 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
|
|||
|
||||
void resetSyncFlag() {
|
||||
this.syncFlag = null;
|
||||
this.writerSyncFlag = null;
|
||||
}
|
||||
|
||||
Boolean getSyncFlag() {
|
||||
return syncFlag;
|
||||
}
|
||||
|
||||
Boolean getWriterSyncFlag() {
|
||||
return writerSyncFlag;
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
|
@ -51,16 +52,51 @@ public class TestFSHLogDurability extends WALDurabilityTestBase<CustomFSHLog> {
|
|||
protected Boolean getSyncFlag(CustomFSHLog wal) {
|
||||
return wal.getSyncFlag();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Boolean getWriterSyncFlag(CustomFSHLog wal) {
|
||||
return wal.getWriterSyncFlag();
|
||||
}
|
||||
}
|
||||
|
||||
class CustomFSHLog extends FSHLog {
|
||||
private Boolean syncFlag;
|
||||
|
||||
private Boolean writerSyncFlag;
|
||||
|
||||
public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf)
|
||||
throws IOException {
|
||||
super(fs, root, logDir, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writer createWriterInstance(Path path) throws IOException {
|
||||
Writer writer = super.createWriterInstance(path);
|
||||
return new Writer() {
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
writer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLength() {
|
||||
return writer.getLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
writerSyncFlag = forceSync;
|
||||
writer.sync(forceSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(Entry entry) throws IOException {
|
||||
writer.append(entry);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(boolean forceSync) throws IOException {
|
||||
syncFlag = forceSync;
|
||||
|
@ -75,9 +111,14 @@ class CustomFSHLog extends FSHLog {
|
|||
|
||||
void resetSyncFlag() {
|
||||
this.syncFlag = null;
|
||||
this.writerSyncFlag = null;
|
||||
}
|
||||
|
||||
Boolean getSyncFlag() {
|
||||
return syncFlag;
|
||||
}
|
||||
|
||||
Boolean getWriterSyncFlag() {
|
||||
return writerSyncFlag;
|
||||
}
|
||||
}
|
|
@ -17,7 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -77,25 +76,39 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
|
|||
|
||||
protected abstract Boolean getSyncFlag(T wal);
|
||||
|
||||
protected abstract Boolean getWriterSyncFlag(T wal);
|
||||
|
||||
@Test
|
||||
public void testWALDurability() throws IOException {
|
||||
byte[] bytes = Bytes.toBytes(getName());
|
||||
Put put = new Put(bytes);
|
||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
||||
|
||||
// global hbase.wal.hsync false, no override in put call - hflush
|
||||
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false");
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Path rootDir = new Path(dir + getName());
|
||||
T wal = getWAL(fs, rootDir, getName(), conf);
|
||||
HRegion region = initHRegion(tableName, null, null, wal);
|
||||
byte[] bytes = Bytes.toBytes(getName());
|
||||
Put put = new Put(bytes);
|
||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
||||
try {
|
||||
resetSyncFlag(wal);
|
||||
assertNull(getSyncFlag(wal));
|
||||
assertNull(getWriterSyncFlag(wal));
|
||||
region.put(put);
|
||||
assertFalse(getSyncFlag(wal));
|
||||
assertFalse(getWriterSyncFlag(wal));
|
||||
|
||||
resetSyncFlag(wal);
|
||||
assertNull(getSyncFlag(wal));
|
||||
region.put(put);
|
||||
assertFalse(getSyncFlag(wal));
|
||||
|
||||
region.close();
|
||||
wal.close();
|
||||
// global hbase.wal.hsync false, durability set in put call - fsync
|
||||
put.setDurability(Durability.FSYNC_WAL);
|
||||
resetSyncFlag(wal);
|
||||
assertNull(getSyncFlag(wal));
|
||||
assertNull(getWriterSyncFlag(wal));
|
||||
region.put(put);
|
||||
assertTrue(getSyncFlag(wal));
|
||||
assertTrue(getWriterSyncFlag(wal));
|
||||
} finally {
|
||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
}
|
||||
|
||||
// global hbase.wal.hsync true, no override in put call
|
||||
conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true");
|
||||
|
@ -103,28 +116,36 @@ public abstract class WALDurabilityTestBase<T extends WAL> {
|
|||
wal = getWAL(fs, rootDir, getName(), conf);
|
||||
region = initHRegion(tableName, null, null, wal);
|
||||
|
||||
resetSyncFlag(wal);
|
||||
assertNull(getSyncFlag(wal));
|
||||
region.put(put);
|
||||
assertEquals(getSyncFlag(wal), true);
|
||||
try {
|
||||
resetSyncFlag(wal);
|
||||
assertNull(getSyncFlag(wal));
|
||||
assertNull(getWriterSyncFlag(wal));
|
||||
region.put(put);
|
||||
assertTrue(getSyncFlag(wal));
|
||||
assertTrue(getWriterSyncFlag(wal));
|
||||
|
||||
// global hbase.wal.hsync true, durability set in put call - fsync
|
||||
put.setDurability(Durability.FSYNC_WAL);
|
||||
resetSyncFlag(wal);
|
||||
assertNull(getSyncFlag(wal));
|
||||
region.put(put);
|
||||
assertTrue(getSyncFlag(wal));
|
||||
// global hbase.wal.hsync true, durability set in put call - fsync
|
||||
put.setDurability(Durability.FSYNC_WAL);
|
||||
resetSyncFlag(wal);
|
||||
assertNull(getSyncFlag(wal));
|
||||
assertNull(getWriterSyncFlag(wal));
|
||||
region.put(put);
|
||||
assertTrue(getSyncFlag(wal));
|
||||
assertTrue(getWriterSyncFlag(wal));
|
||||
|
||||
// global hbase.wal.hsync true, durability set in put call - sync
|
||||
put = new Put(bytes);
|
||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
||||
put.setDurability(Durability.SYNC_WAL);
|
||||
resetSyncFlag(wal);
|
||||
assertNull(getSyncFlag(wal));
|
||||
region.put(put);
|
||||
assertFalse(getSyncFlag(wal));
|
||||
|
||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
// global hbase.wal.hsync true, durability set in put call - sync
|
||||
put = new Put(bytes);
|
||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
|
||||
put.setDurability(Durability.SYNC_WAL);
|
||||
resetSyncFlag(wal);
|
||||
assertNull(getSyncFlag(wal));
|
||||
assertNull(getWriterSyncFlag(wal));
|
||||
region.put(put);
|
||||
assertFalse(getSyncFlag(wal));
|
||||
assertFalse(getWriterSyncFlag(wal));
|
||||
} finally {
|
||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
}
|
||||
}
|
||||
|
||||
private String getName() {
|
||||
|
|
Loading…
Reference in New Issue