HBASE-6420 Gracefully shutdown logsyncer

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1363180 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2012-07-19 00:00:40 +00:00
parent 5586daa4de
commit dc17a2559c
1 changed files with 13 additions and 8 deletions

View File

@ -41,6 +41,7 @@ import java.util.TreeSet;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -984,7 +985,6 @@ public class HLog implements Syncable {
*/ */
public void close() throws IOException { public void close() throws IOException {
try { try {
logSyncerThread.interrupt();
logSyncerThread.close(); logSyncerThread.close();
// Make sure we synced everything // Make sure we synced everything
logSyncerThread.join(this.optionalFlushInterval*2); logSyncerThread.join(this.optionalFlushInterval*2);
@ -1197,8 +1197,8 @@ public class HLog implements Syncable {
class LogSyncer extends HasThread { class LogSyncer extends HasThread {
private final long optionalFlushInterval; private final long optionalFlushInterval;
private boolean closeLogSyncer = false; private AtomicBoolean closeLogSyncer = new AtomicBoolean(false);
// List of pending writes to the HLog. There corresponds to transactions // List of pending writes to the HLog. There corresponds to transactions
// that have not yet returned to the client. We keep them cached here // that have not yet returned to the client. We keep them cached here
@ -1217,11 +1217,13 @@ public class HLog implements Syncable {
try { try {
// awaiting with a timeout doesn't always // awaiting with a timeout doesn't always
// throw exceptions on interrupt // throw exceptions on interrupt
while(!this.isInterrupted() && !closeLogSyncer) { while(!this.isInterrupted() && !closeLogSyncer.get()) {
try { try {
if (unflushedEntries.get() <= syncedTillHere) { if (unflushedEntries.get() <= syncedTillHere) {
Thread.sleep(this.optionalFlushInterval); synchronized (closeLogSyncer) {
closeLogSyncer.wait(this.optionalFlushInterval);
}
} }
sync(); sync();
} catch (IOException e) { } catch (IOException e) {
@ -1260,9 +1262,12 @@ public class HLog implements Syncable {
writer.append(e); writer.append(e);
} }
} }
void close(){ void close() {
closeLogSyncer = true; synchronized (closeLogSyncer) {
closeLogSyncer.set(true);
closeLogSyncer.notifyAll();
}
} }
} }