HBASE-13515 Handle FileNotFoundException in region replica replay for flush/compaction events
This commit is contained in:
parent
8e6353ccd1
commit
0c6b1c9388
|
@ -4073,6 +4073,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
|
||||
logRegionFiles();
|
||||
} catch (FileNotFoundException ex) {
|
||||
LOG.warn(getRegionInfo().getEncodedName() + " : "
|
||||
+ "At least one of the store files in compaction: "
|
||||
+ TextFormat.shortDebugString(compaction)
|
||||
+ " doesn't exist any more. Skip loading the file(s)", ex);
|
||||
} finally {
|
||||
closeRegionOperation(Operation.REPLAY_EVENT);
|
||||
}
|
||||
|
@ -4341,16 +4346,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// flushes from ALL stores.
|
||||
getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
|
||||
|
||||
// C. Finally notify anyone waiting on memstore to clear:
|
||||
// e.g. checkResources().
|
||||
synchronized (this) {
|
||||
notifyAll(); // FindBugs NN_NAKED_NOTIFY
|
||||
}
|
||||
} finally {
|
||||
} catch (FileNotFoundException ex) {
|
||||
LOG.warn(getRegionInfo().getEncodedName() + " : "
|
||||
+ "At least one of the store files in flush: " + TextFormat.shortDebugString(flush)
|
||||
+ " doesn't exist any more. Skip loading the file(s)", ex);
|
||||
}
|
||||
finally {
|
||||
status.cleanup();
|
||||
writestate.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
// C. Finally notify anyone waiting on memstore to clear:
|
||||
// e.g. checkResources().
|
||||
synchronized (this) {
|
||||
notifyAll(); // FindBugs NN_NAKED_NOTIFY
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4389,6 +4400,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
+ Bytes.toString(family) + " but no associated flush context. Ignoring");
|
||||
continue;
|
||||
}
|
||||
|
||||
ctx.replayFlush(flushFiles, dropMemstoreSnapshot); // replay the flush
|
||||
|
||||
// Record latest flush time
|
||||
|
@ -4531,7 +4543,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
long storeSeqId = store.getMaxSequenceId();
|
||||
List<String> storeFiles = storeDescriptor.getStoreFileList();
|
||||
store.refreshStoreFiles(storeFiles); // replace the files with the new ones
|
||||
try {
|
||||
store.refreshStoreFiles(storeFiles); // replace the files with the new ones
|
||||
} catch (FileNotFoundException ex) {
|
||||
LOG.warn(getRegionInfo().getEncodedName() + " : "
|
||||
+ "At least one of the store files: " + storeFiles
|
||||
+ " doesn't exist any more. Skip loading the file(s)", ex);
|
||||
continue;
|
||||
}
|
||||
if (store.getMaxSequenceId() != storeSeqId) {
|
||||
// Record latest flush time if we picked up new files
|
||||
lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
|
||||
|
|
|
@ -408,7 +408,10 @@ public class StoreFileInfo {
|
|||
// Tabledir is up two directories from where Reference was written.
|
||||
Path tableDir = p.getParent().getParent().getParent();
|
||||
String nameStrippedOfSuffix = m.group(1);
|
||||
LOG.debug("reference '" + p + "' to region=" + otherRegion + " hfile=" + nameStrippedOfSuffix);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reference '" + p + "' to region=" + otherRegion
|
||||
+ " hfile=" + nameStrippedOfSuffix);
|
||||
}
|
||||
|
||||
// Build up new path with the referenced region in place of our current
|
||||
// region in the reference path. Also strip regionname suffix from name.
|
||||
|
|
|
@ -28,9 +28,7 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
import static org.apache.hadoop.hbase.regionserver.TestHRegion.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -58,15 +56,17 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
|
||||
|
@ -89,7 +89,6 @@ import org.junit.Before;
|
|||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -1510,6 +1509,79 @@ public class TestHRegionReplayEvents {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
|
||||
// tests replaying flush commit marker, but the flush file has already been compacted
|
||||
// from primary and also deleted from the archive directory
|
||||
secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
|
||||
setFlushSequenceNumber(Long.MAX_VALUE)
|
||||
.setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
|
||||
.setAction(FlushAction.COMMIT_FLUSH)
|
||||
.setEncodedRegionName(
|
||||
ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
|
||||
.setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
|
||||
.addStoreFlushes(StoreFlushDescriptor.newBuilder()
|
||||
.setFamilyName(ByteString.copyFrom(families[0]))
|
||||
.setStoreHomeDir("/store_home_dir")
|
||||
.addFlushOutput("/foo/baz/bar")
|
||||
.build())
|
||||
.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
|
||||
// tests replaying compaction marker, but the compaction output file has already been compacted
|
||||
// from primary and also deleted from the archive directory
|
||||
secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
|
||||
.setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
|
||||
.setEncodedRegionName(
|
||||
ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
|
||||
.setFamilyName(ByteString.copyFrom(families[0]))
|
||||
.addCompactionInput("/foo")
|
||||
.addCompactionOutput("/bar")
|
||||
.setStoreHomeDir("/store_home_dir")
|
||||
.setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
|
||||
.build()
|
||||
, true, true, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
|
||||
// tests replaying region open event marker, but the region files have already been compacted
|
||||
// from primary and also deleted from the archive directory
|
||||
secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
|
||||
.setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
|
||||
.setEncodedRegionName(
|
||||
ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
|
||||
.setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
|
||||
.setEventType(EventType.REGION_OPEN)
|
||||
.setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
|
||||
.setLogSequenceNumber(Long.MAX_VALUE)
|
||||
.addStores(StoreDescriptor.newBuilder()
|
||||
.setFamilyName(ByteString.copyFrom(families[0]))
|
||||
.setStoreHomeDir("/store_home_dir")
|
||||
.addStoreFile("/foo")
|
||||
.build())
|
||||
.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
|
||||
// tests replaying bulk load event marker, but the bulk load files have already been compacted
|
||||
// from primary and also deleted from the archive directory
|
||||
secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
|
||||
.setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDesc().getTableName()))
|
||||
.setEncodedRegionName(
|
||||
ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
|
||||
.setBulkloadSeqNum(Long.MAX_VALUE)
|
||||
.addStores(StoreDescriptor.newBuilder()
|
||||
.setFamilyName(ByteString.copyFrom(families[0]))
|
||||
.setStoreHomeDir("/store_home_dir")
|
||||
.addStoreFile("/foo")
|
||||
.build())
|
||||
.build());
|
||||
}
|
||||
|
||||
private String createHFileForFamilies(Path testPath, byte[] family,
|
||||
byte[] valueBytes) throws IOException {
|
||||
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
|
||||
|
|
Loading…
Reference in New Issue