HBASE-5782 Edits can be appended out of seqid order since HBASE-4487
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1327673 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
89a985f28c
commit
e8ec0b9f8c
|
@ -230,6 +230,7 @@ public class HLog implements Syncable {
|
||||||
// during an update
|
// during an update
|
||||||
// locked during appends
|
// locked during appends
|
||||||
private final Object updateLock = new Object();
|
private final Object updateLock = new Object();
|
||||||
|
private final Object flushLock = new Object();
|
||||||
|
|
||||||
private final boolean enabled;
|
private final boolean enabled;
|
||||||
|
|
||||||
|
@ -297,7 +298,6 @@ public class HLog implements Syncable {
|
||||||
private static Metric writeSize = new Metric();
|
private static Metric writeSize = new Metric();
|
||||||
// For measuring latency of syncs
|
// For measuring latency of syncs
|
||||||
private static Metric syncTime = new Metric();
|
private static Metric syncTime = new Metric();
|
||||||
private static AtomicLong syncBatchSize = new AtomicLong();
|
|
||||||
//For measuring slow HLog appends
|
//For measuring slow HLog appends
|
||||||
private static AtomicLong slowHLogAppendCount = new AtomicLong();
|
private static AtomicLong slowHLogAppendCount = new AtomicLong();
|
||||||
private static Metric slowHLogAppendTime = new Metric();
|
private static Metric slowHLogAppendTime = new Metric();
|
||||||
|
@ -314,10 +314,6 @@ public class HLog implements Syncable {
|
||||||
return syncTime.get();
|
return syncTime.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static long getSyncBatchSize() {
|
|
||||||
return syncBatchSize.getAndSet(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static long getSlowAppendCount() {
|
public static long getSlowAppendCount() {
|
||||||
return slowHLogAppendCount.get();
|
return slowHLogAppendCount.get();
|
||||||
}
|
}
|
||||||
|
@ -1258,32 +1254,43 @@ public class HLog implements Syncable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
long doneUpto = this.unflushedEntries.get();
|
long doneUpto;
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
// Done in parallel for all writer threads, thanks to HDFS-895
|
|
||||||
List<Entry> pending = logSyncerThread.getPendingWrites();
|
|
||||||
try {
|
|
||||||
// First flush all the pending writes to HDFS. Then
|
// First flush all the pending writes to HDFS. Then
|
||||||
// issue the sync to HDFS. If sync is successful, then update
|
// issue the sync to HDFS. If sync is successful, then update
|
||||||
// syncedTillHere to indicate that transactions till this
|
// syncedTillHere to indicate that transactions till this
|
||||||
// number has been successfully synced.
|
// number has been successfully synced.
|
||||||
|
synchronized (flushLock) {
|
||||||
|
if (txid <= this.syncedTillHere) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
doneUpto = this.unflushedEntries.get();
|
||||||
|
List<Entry> pending = logSyncerThread.getPendingWrites();
|
||||||
|
try {
|
||||||
logSyncerThread.hlogFlush(tempWriter, pending);
|
logSyncerThread.hlogFlush(tempWriter, pending);
|
||||||
pending = null;
|
|
||||||
tempWriter.sync();
|
|
||||||
syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
|
|
||||||
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
|
|
||||||
} catch(IOException io) {
|
} catch(IOException io) {
|
||||||
synchronized (this.updateLock) {
|
synchronized (this.updateLock) {
|
||||||
// HBASE-4387, HBASE-5623, retry with updateLock held
|
// HBASE-4387, HBASE-5623, retry with updateLock held
|
||||||
tempWriter = this.writer;
|
tempWriter = this.writer;
|
||||||
logSyncerThread.hlogFlush(tempWriter, pending);
|
logSyncerThread.hlogFlush(tempWriter, pending);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// another thread might have sync'ed avoid double-sync'ing
|
||||||
|
if (txid <= this.syncedTillHere) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
tempWriter.sync();
|
||||||
|
} catch(IOException io) {
|
||||||
|
synchronized (this.updateLock) {
|
||||||
|
// HBASE-4387, HBASE-5623, retry with updateLock held
|
||||||
|
tempWriter = this.writer;
|
||||||
tempWriter.sync();
|
tempWriter.sync();
|
||||||
syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
|
|
||||||
this.syncedTillHere = doneUpto;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// We try to not acquire the updateLock just to update statistics.
|
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
|
||||||
// Make these statistics as AtomicLong.
|
|
||||||
syncTime.inc(System.currentTimeMillis() - now);
|
syncTime.inc(System.currentTimeMillis() - now);
|
||||||
if (!this.logRollRunning) {
|
if (!this.logRollRunning) {
|
||||||
checkLowReplication();
|
checkLowReplication();
|
||||||
|
|
|
@ -132,6 +132,19 @@ public class TestHLog {
|
||||||
return "TestHLog";
|
return "TestHLog";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that with three concurrent threads we still write edits in sequence
|
||||||
|
* edit id order.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMaintainOrderWithConcurrentWrites() throws Exception {
|
||||||
|
// Run the HPE tool with three threads writing 3000 edits each concurrently.
|
||||||
|
// When done, verify that all edits were written and that the order in the
|
||||||
|
// WALs is of ascending edit sequence ids.
|
||||||
|
HLogPerformanceEvaluation.main(new String [] {"-threads", "3", "-verify", "-iterations", "3000"});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Just write multiple logs then split. Before fix for HADOOP-2283, this
|
* Just write multiple logs then split. Before fix for HADOOP-2283, this
|
||||||
* would fail.
|
* would fail.
|
||||||
|
|
Loading…
Reference in New Issue