HBASE-13515 Handle FileNotFoundException in region replica replay for flush/compaction events
This commit is contained in:
parent
2ba4c4eb9f
commit
4e0de088c5
|
@ -4073,6 +4073,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
|
store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
|
||||||
logRegionFiles();
|
logRegionFiles();
|
||||||
|
} catch (FileNotFoundException ex) {
|
||||||
|
LOG.warn(getRegionInfo().getEncodedName() + " : "
|
||||||
|
+ "At least one of the store files in compaction: "
|
||||||
|
+ TextFormat.shortDebugString(compaction)
|
||||||
|
+ " doesn't exist any more. Skip loading the file(s)", ex);
|
||||||
} finally {
|
} finally {
|
||||||
closeRegionOperation(Operation.REPLAY_EVENT);
|
closeRegionOperation(Operation.REPLAY_EVENT);
|
||||||
}
|
}
|
||||||
|
@ -4341,16 +4346,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
// flushes from ALL stores.
|
// flushes from ALL stores.
|
||||||
getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
|
getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
|
||||||
|
|
||||||
|
} catch (FileNotFoundException ex) {
|
||||||
|
LOG.warn(getRegionInfo().getEncodedName() + " : "
|
||||||
|
+ "At least one of the store files in flush: " + TextFormat.shortDebugString(flush)
|
||||||
|
+ " doesn't exist any more. Skip loading the file(s)", ex);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
status.cleanup();
|
||||||
|
writestate.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// C. Finally notify anyone waiting on memstore to clear:
|
// C. Finally notify anyone waiting on memstore to clear:
|
||||||
// e.g. checkResources().
|
// e.g. checkResources().
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
notifyAll(); // FindBugs NN_NAKED_NOTIFY
|
notifyAll(); // FindBugs NN_NAKED_NOTIFY
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
status.cleanup();
|
|
||||||
writestate.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -4389,6 +4400,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
+ Bytes.toString(family) + " but no associated flush context. Ignoring");
|
+ Bytes.toString(family) + " but no associated flush context. Ignoring");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.replayFlush(flushFiles, dropMemstoreSnapshot); // replay the flush
|
ctx.replayFlush(flushFiles, dropMemstoreSnapshot); // replay the flush
|
||||||
|
|
||||||
// Record latest flush time
|
// Record latest flush time
|
||||||
|
@ -4531,7 +4543,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
long storeSeqId = store.getMaxSequenceId();
|
long storeSeqId = store.getMaxSequenceId();
|
||||||
List<String> storeFiles = storeDescriptor.getStoreFileList();
|
List<String> storeFiles = storeDescriptor.getStoreFileList();
|
||||||
|
try {
|
||||||
store.refreshStoreFiles(storeFiles); // replace the files with the new ones
|
store.refreshStoreFiles(storeFiles); // replace the files with the new ones
|
||||||
|
} catch (FileNotFoundException ex) {
|
||||||
|
LOG.warn(getRegionInfo().getEncodedName() + " : "
|
||||||
|
+ "At least one of the store files: " + storeFiles
|
||||||
|
+ " doesn't exist any more. Skip loading the file(s)", ex);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (store.getMaxSequenceId() != storeSeqId) {
|
if (store.getMaxSequenceId() != storeSeqId) {
|
||||||
// Record latest flush time if we picked up new files
|
// Record latest flush time if we picked up new files
|
||||||
lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
|
lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
|
||||||
|
|
|
@ -408,7 +408,10 @@ public class StoreFileInfo {
|
||||||
// Tabledir is up two directories from where Reference was written.
|
// Tabledir is up two directories from where Reference was written.
|
||||||
Path tableDir = p.getParent().getParent().getParent();
|
Path tableDir = p.getParent().getParent().getParent();
|
||||||
String nameStrippedOfSuffix = m.group(1);
|
String nameStrippedOfSuffix = m.group(1);
|
||||||
LOG.debug("reference '" + p + "' to region=" + otherRegion + " hfile=" + nameStrippedOfSuffix);
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("reference '" + p + "' to region=" + otherRegion
|
||||||
|
+ " hfile=" + nameStrippedOfSuffix);
|
||||||
|
}
|
||||||
|
|
||||||
// Build up new path with the referenced region in place of our current
|
// Build up new path with the referenced region in place of our current
|
||||||
// region in the reference path. Also strip regionname suffix from name.
|
// region in the reference path. Also strip regionname suffix from name.
|
||||||
|
|
|
@ -28,9 +28,7 @@ import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
import static org.apache.hadoop.hbase.regionserver.TestHRegion.*;
|
import static org.apache.hadoop.hbase.regionserver.TestHRegion.*;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -58,15 +56,17 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
|
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
|
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
|
||||||
|
@ -89,7 +89,6 @@ import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TemporaryFolder;
|
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
@ -1510,6 +1509,79 @@ public class TestHRegionReplayEvents {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
|
||||||
|
// tests replaying flush commit marker, but the flush file has already been compacted
|
||||||
|
// from primary and also deleted from the archive directory
|
||||||
|
secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
|
||||||
|
setFlushSequenceNumber(Long.MAX_VALUE)
|
||||||
|
.setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
|
||||||
|
.setAction(FlushAction.COMMIT_FLUSH)
|
||||||
|
.setEncodedRegionName(
|
||||||
|
ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
|
||||||
|
.setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
|
||||||
|
.addStoreFlushes(StoreFlushDescriptor.newBuilder()
|
||||||
|
.setFamilyName(ByteString.copyFrom(families[0]))
|
||||||
|
.setStoreHomeDir("/store_home_dir")
|
||||||
|
.addFlushOutput("/foo/baz/bar")
|
||||||
|
.build())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
|
||||||
|
// tests replaying compaction marker, but the compaction output file has already been compacted
|
||||||
|
// from primary and also deleted from the archive directory
|
||||||
|
secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
|
||||||
|
.setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
|
||||||
|
.setEncodedRegionName(
|
||||||
|
ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
|
||||||
|
.setFamilyName(ByteString.copyFrom(families[0]))
|
||||||
|
.addCompactionInput("/foo")
|
||||||
|
.addCompactionOutput("/bar")
|
||||||
|
.setStoreHomeDir("/store_home_dir")
|
||||||
|
.setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
|
||||||
|
.build()
|
||||||
|
, true, true, Long.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
|
||||||
|
// tests replaying region open event marker, but the region files have already been compacted
|
||||||
|
// from primary and also deleted from the archive directory
|
||||||
|
secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
|
||||||
|
.setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
|
||||||
|
.setEncodedRegionName(
|
||||||
|
ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
|
||||||
|
.setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
|
||||||
|
.setEventType(EventType.REGION_OPEN)
|
||||||
|
.setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
|
||||||
|
.setLogSequenceNumber(Long.MAX_VALUE)
|
||||||
|
.addStores(StoreDescriptor.newBuilder()
|
||||||
|
.setFamilyName(ByteString.copyFrom(families[0]))
|
||||||
|
.setStoreHomeDir("/store_home_dir")
|
||||||
|
.addStoreFile("/foo")
|
||||||
|
.build())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
|
||||||
|
// tests replaying bulk load event marker, but the bulk load files have already been compacted
|
||||||
|
// from primary and also deleted from the archive directory
|
||||||
|
secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
|
||||||
|
.setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDesc().getTableName()))
|
||||||
|
.setEncodedRegionName(
|
||||||
|
ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
|
||||||
|
.setBulkloadSeqNum(Long.MAX_VALUE)
|
||||||
|
.addStores(StoreDescriptor.newBuilder()
|
||||||
|
.setFamilyName(ByteString.copyFrom(families[0]))
|
||||||
|
.setStoreHomeDir("/store_home_dir")
|
||||||
|
.addStoreFile("/foo")
|
||||||
|
.build())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
private String createHFileForFamilies(Path testPath, byte[] family,
|
private String createHFileForFamilies(Path testPath, byte[] family,
|
||||||
byte[] valueBytes) throws IOException {
|
byte[] valueBytes) throws IOException {
|
||||||
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
|
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
|
||||||
|
|
Loading…
Reference in New Issue