HADOOP-1903 Possible data loss if Exception happens between snapshot and flush

to disk.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@575982 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-09-15 21:11:47 +00:00
parent 7ce0506c27
commit b271048e2f
7 changed files with 150 additions and 72 deletions

View File

@ -48,6 +48,8 @@ Trunk (unreleased changes)
HADOOP-1870 Once file system failure has been detected, don't check it again
and get on with shutting down the hbase cluster.
HADOOP-1888 NullPointerException in HMemcacheScanner
HADOOP-1903 Possible data loss if Exception happens between snapshot and flush
to disk.
IMPROVEMENTS
HADOOP-1737 Make HColumnDescriptor data publically members settable

View File

@ -0,0 +1,32 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
/**
* Thrown during flush if the possibility snapshot content was not properly
* persisted into store files. Response should include replay of hlog content.
*/
public class DroppedSnapshotException extends IOException {
public DroppedSnapshotException(String msg) {
super(msg);
}
public DroppedSnapshotException() {
super();
}
}

View File

@ -399,6 +399,7 @@ public class HLog implements HConstants {
* the flush will not appear in the correct logfile.
* @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
* @see #completeCacheFlush(Text, Text, long)
* @see #abortCacheFlush()
*/
synchronized long startCacheFlush() {
while (this.insideCacheFlush) {
@ -422,7 +423,7 @@ public class HLog implements HConstants {
synchronized void completeCacheFlush(final Text regionName,
final Text tableName, final long logSeqId)
throws IOException {
if(closed) {
if(this.closed) {
return;
}
@ -430,17 +431,32 @@ public class HLog implements HConstants {
throw new IOException("Impossible situation: inside " +
"completeCacheFlush(), but 'insideCacheFlush' flag is false");
}
writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
this.writer.append(key,
new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
System.currentTimeMillis()));
numEntries.getAndIncrement();
this.numEntries.getAndIncrement();
// Remember the most-recent flush for each region.
// This is used to delete obsolete log files.
regionToLastFlush.put(regionName, logSeqId);
this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
insideCacheFlush = false;
cleanup();
}
/**
* Abort a cache flush.
* This method will clear waits on {@link #insideCacheFlush}. Call if the
* flush fails. Note that the only recovery for an aborted flush currently
* is a restart of the regionserver so the snapshot content dropped by the
* failure gets restored to the memcache.
*/
synchronized void abortCacheFlush() {
cleanup();
}
private synchronized void cleanup() {
this.insideCacheFlush = false;
notifyAll();
}

View File

@ -657,7 +657,7 @@ HMasterRegionInterface {
if (checkFileSystem()) {
// If filesystem is OK, is the exception a ConnectionException?
// If so, mark the server as down. No point scanning either
// if no server to put meta region on.
// if no server to put meta region on. TODO.
if (e instanceof ConnectException) {
LOG.debug("Region hosting server is gone.");
}

View File

@ -101,6 +101,7 @@ public class HMemcache {
}
Snapshot retval =
new Snapshot(memcache, Long.valueOf(log.startCacheFlush()));
// From here on, any failure is catastrophic requiring replay of hlog
this.snapshot = memcache;
history.add(memcache);
memcache = new TreeMap<HStoreKey, byte []>();

View File

@ -721,6 +721,9 @@ public class HRegion implements HConstants {
/**
* Each HRegion is given a periodic chance to flush the cache, which it should
* only take if there have been a lot of uncommitted writes.
* @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
void optionallyFlush() throws IOException {
if(this.memcache.getSize() > this.memcacheFlushSize) {
@ -754,6 +757,9 @@ public class HRegion implements HConstants {
* close() the HRegion shortly, so the HRegion should not take on any new and
* potentially long-lasting disk operations. This flush() should be the final
* pre-close() disk operation.
* @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
void flushcache(boolean disableFutureWrites)
throws IOException {
@ -815,6 +821,9 @@ public class HRegion implements HConstants {
* routes.
*
* <p> This method may block for some time.
* @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
void internalFlushcache() throws IOException {
long startTime = -1;
@ -833,13 +842,19 @@ public class HRegion implements HConstants {
//
// When execution returns from snapshotMemcacheForLog() with a non-NULL
// value, the HMemcache will have a snapshot object stored that must be
// explicitly cleaned up using a call to deleteSnapshot().
// explicitly cleaned up using a call to deleteSnapshot() or by calling
// abort.
//
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
if(retval == null || retval.memcacheSnapshot == null) {
LOG.debug("Finished memcache flush; empty snapshot");
return;
}
// Any failure from here on out will be catastrophic requiring server
// restart so hlog content can be replayed and put back into the memcache.
// Otherwise, the snapshot content while backed up in the hlog, it will not
// be part of the current running servers state.
try {
long logCacheFlushId = retval.sequenceId;
if(LOG.isDebugEnabled()) {
@ -852,7 +867,7 @@ public class HRegion implements HConstants {
// A. Flush memcache to all the HStores.
// Keep running vector of all store files that includes both old and the
// just-made new flush store file.
for(HStore hstore: stores.values()) {
for (HStore hstore: stores.values()) {
hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
}
@ -860,17 +875,18 @@ public class HRegion implements HConstants {
// This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
log.completeCacheFlush(this.regionInfo.regionName,
regionInfo.tableDesc.getName(), logCacheFlushId);
this.log.completeCacheFlush(this.regionInfo.regionName,
regionInfo.tableDesc.getName(), logCacheFlushId);
} catch (IOException e) {
LOG.fatal("Interrupted while flushing. Edits lost. FIX! HADOOP-1903", e);
log.abort();
throw e;
// An exception here means that the snapshot was not persisted.
// The hlog needs to be replayed so its content is restored to memcache.
// Currently, only a server restart will do this.
this.log.abortCacheFlush();
throw new DroppedSnapshotException(e.getMessage());
} finally {
// C. Delete the now-irrelevant memcache snapshot; its contents have been
// dumped to disk-based HStores.
memcache.deleteSnapshot();
// dumped to disk-based HStores or, if error, clear aborted snapshot.
this.memcache.deleteSnapshot();
}
// D. Finally notify anyone waiting on memcache to clear:
@ -1386,7 +1402,7 @@ public class HRegion implements HConstants {
}
/*
* Add updates to the log and add values to the memcache.
* Add updates first to the hlog and then add values to memcache.
* Warning: Assumption is caller has lock on passed in row.
* @param row Row to update.
* @param timestamp Timestamp to record the updates against

View File

@ -292,6 +292,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
for(HRegion cur: nonClosedRegionsToFlush) {
try {
cur.optionallyFlush();
} catch (DroppedSnapshotException e) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of hlog
// is required. Currently the only way to do this is a restart of
// the server.
LOG.fatal("Replay of hlog required. Forcing server restart", e);
if (!checkFileSystem()) {
break;
}
HRegionServer.this.stop();
} catch (IOException iex) {
LOG.error("Cache flush failed",
RemoteExceptionHandler.checkIOException(iex));
@ -442,11 +452,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/**
* Sets a flag that will cause all the HRegionServer threads to shut down
* in an orderly fashion.
* <p>FOR DEBUGGING ONLY
* in an orderly fashion. Used by unit tests and called by {@link Flusher}
* if it judges server needs to be restarted.
*/
synchronized void stop() {
stopRequested.set(true);
this.stopRequested.set(true);
notifyAll(); // Wakes run() if it is sleeping
}
@ -457,7 +467,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* from under hbase or we OOME.
*/
synchronized void abort() {
abortRequested = true;
this.abortRequested = true;
stop();
}
@ -621,7 +631,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
if (this.fsOk) {
// Only try to clean up if the file system is available
try {
log.close();
this.log.close();
LOG.info("On abort, closed hlog");
} catch (IOException e) {
LOG.error("Unable to close log in abort",
@ -661,7 +671,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
join();
LOG.info("main thread exiting");
LOG.info(Thread.currentThread().getName() + " exiting");
}
/*
@ -674,7 +684,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* run. On its way out, this server will shut down Server. Leases are sort
* of inbetween. It has an internal thread that while it inherits from
* Chore, it keeps its own internal stop mechanism so needs to be stopped
* by this hosting server.
* by this hosting server. Worker logs the exception and exits.
*/
private void startAllServices() {
String n = Thread.currentThread().getName();
@ -731,6 +741,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
/** Add to the outbound message buffer */
private void reportOpen(HRegion region) {
synchronized(outboundMsgs) {
@ -790,58 +801,58 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
public void run() {
try {
for(ToDoEntry e = null; !stopRequested.get(); ) {
try {
e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
// continue
}
if(e == null || stopRequested.get()) {
continue;
}
try {
LOG.info(e.msg.toString());
switch(e.msg.getMsg()) {
case HMsg.MSG_REGION_OPEN:
// Open a region
openRegion(e.msg.getRegionInfo());
break;
case HMsg.MSG_REGION_CLOSE:
// Close a region
closeRegion(e.msg.getRegionInfo(), true);
break;
case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:
// Close a region, don't reply
closeRegion(e.msg.getRegionInfo(), false);
break;
default:
throw new AssertionError(
"Impossible state during msg processing. Instruction: "
+ e.msg.toString());
for(ToDoEntry e = null; !stopRequested.get(); ) {
try {
e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
// continue
}
} catch (IOException ie) {
ie = RemoteExceptionHandler.checkIOException(ie);
if(e.tries < numRetries) {
LOG.warn(ie);
e.tries++;
try {
toDo.put(e);
} catch (InterruptedException ex) {
throw new RuntimeException("Putting into msgQueue was interrupted.", ex);
}
} else {
LOG.error("unable to process message: " + e.msg.toString(), ie);
if (!checkFileSystem()) {
if(e == null || stopRequested.get()) {
continue;
}
try {
LOG.info(e.msg.toString());
switch(e.msg.getMsg()) {
case HMsg.MSG_REGION_OPEN:
// Open a region
openRegion(e.msg.getRegionInfo());
break;
case HMsg.MSG_REGION_CLOSE:
// Close a region
closeRegion(e.msg.getRegionInfo(), true);
break;
case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:
// Close a region, don't reply
closeRegion(e.msg.getRegionInfo(), false);
break;
default:
throw new AssertionError(
"Impossible state during msg processing. Instruction: "
+ e.msg.toString());
}
} catch (IOException ie) {
ie = RemoteExceptionHandler.checkIOException(ie);
if(e.tries < numRetries) {
LOG.warn(ie);
e.tries++;
try {
toDo.put(e);
} catch (InterruptedException ex) {
throw new RuntimeException("Putting into msgQueue was " +
"interrupted.", ex);
}
} else {
LOG.error("unable to process message: " + e.msg.toString(), ie);
if (!checkFileSystem()) {
break;
}
}
}
}
}
} catch(Throwable t) {
LOG.fatal("Unhandled exception", t);
} finally {