HBASE-11511 Write flush events to WAL

This commit is contained in:
Enis Soztutar 2014-07-15 14:47:19 -07:00
parent fe50c6d366
commit bbe29eb93c
9 changed files with 2701 additions and 13 deletions

View File

@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
import org.apache.hadoop.hbase.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
@ -117,6 +118,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.access.TablePermission; import org.apache.hadoop.hbase.security.access.TablePermission;
import org.apache.hadoop.hbase.security.access.UserPermission; import org.apache.hadoop.hbase.security.access.UserPermission;
@ -2499,6 +2502,29 @@ public final class ProtobufUtil {
return builder.build(); return builder.build();
} }
public static FlushDescriptor toFlushDescriptor(FlushAction action, HRegionInfo hri,
long flushSeqId, Map<byte[], List<Path>> committedFiles) {
FlushDescriptor.Builder desc = FlushDescriptor.newBuilder()
.setAction(action)
.setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
.setFlushSequenceNumber(flushSeqId)
.setTableName(ByteStringer.wrap(hri.getTable().getName()));
for (Map.Entry<byte[], List<Path>> entry : committedFiles.entrySet()) {
WALProtos.FlushDescriptor.StoreFlushDescriptor.Builder builder =
WALProtos.FlushDescriptor.StoreFlushDescriptor.newBuilder()
.setFamilyName(ByteStringer.wrap(entry.getKey()))
.setStoreHomeDir(Bytes.toString(entry.getKey())); //relative to region
if (entry.getValue() != null) {
for (Path path : entry.getValue()) {
builder.addFlushOutput(path.getName());
}
}
desc.addStoreFlushes(builder);
}
return desc.build();
}
/** /**
* Return short version of Message toString'd, shorter than TextFormat#shortDebugString. * Return short version of Message toString'd, shorter than TextFormat#shortDebugString.
* Tries to NOT print out data both because it can be big but also so we do not have data in our * Tries to NOT print out data both because it can be big but also so we do not have data in our

View File

@ -89,12 +89,35 @@ message CompactionDescriptor {
required bytes table_name = 1; // TODO: WALKey already stores these, might remove required bytes table_name = 1; // TODO: WALKey already stores these, might remove
required bytes encoded_region_name = 2; required bytes encoded_region_name = 2;
required bytes family_name = 3; required bytes family_name = 3;
repeated string compaction_input = 4; repeated string compaction_input = 4; // relative to store dir
repeated string compaction_output = 5; repeated string compaction_output = 5;
required string store_home_dir = 6; required string store_home_dir = 6; // relative to region dir
optional bytes region_name = 7; // full region name optional bytes region_name = 7; // full region name
} }
/**
* Special WAL entry to hold all related to a flush.
*/
message FlushDescriptor {
enum FlushAction {
START_FLUSH = 0;
COMMIT_FLUSH = 1;
ABORT_FLUSH = 2;
}
message StoreFlushDescriptor {
required bytes family_name = 1;
required string store_home_dir = 2; //relative to region dir
repeated string flush_output = 3; // relative to store dir (if this is a COMMIT_FLUSH)
}
required FlushAction action = 1;
required bytes table_name = 2;
required bytes encoded_region_name = 3;
optional uint64 flush_sequence_number = 4;
repeated StoreFlushDescriptor store_flushes = 5;
}
/** /**
* A trailer that is appended to the end of a properly closed HLog WAL file. * A trailer that is appended to the end of a properly closed HLog WAL file.
* If missing, this is either a legacy or a corrupted WAL file. * If missing, this is either a legacy or a corrupted WAL file.

View File

@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
@ -113,10 +114,13 @@ import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -1729,8 +1733,11 @@ public class HRegion implements HeapSize { // , Writable{
status.setStatus("Preparing to flush by snapshotting stores in " + status.setStatus("Preparing to flush by snapshotting stores in " +
getRegionInfo().getEncodedName()); getRegionInfo().getEncodedName());
List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size()); List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
Bytes.BYTES_COMPARATOR);
long flushSeqId = -1L; long flushSeqId = -1L;
long trxId = 0;
try { try {
try { try {
w = mvcc.beginMemstoreInsert(); w = mvcc.beginMemstoreInsert();
@ -1754,12 +1761,39 @@ public class HRegion implements HeapSize { // , Writable{
for (Store s : stores.values()) { for (Store s : stores.values()) {
totalFlushableSize += s.getFlushableSize(); totalFlushableSize += s.getFlushableSize();
storeFlushCtxs.add(s.createFlushContext(flushSeqId)); storeFlushCtxs.add(s.createFlushContext(flushSeqId));
committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
}
// write the snapshot start to WAL
if (wal != null) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
} }
// Prepare flush (take a snapshot) // Prepare flush (take a snapshot)
for (StoreFlushContext flush : storeFlushCtxs) { for (StoreFlushContext flush : storeFlushCtxs) {
flush.prepare(); flush.prepare();
} }
} catch (IOException ex) {
if (wal != null) {
if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
StringUtils.stringifyException(t));
// ignore this since we will be aborting the RS with DSE.
}
}
// we have called wal.startCacheFlush(), now we have to abort it
wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
throw ex; // let upper layers deal with it.
}
} finally { } finally {
this.updatesLock.writeLock().unlock(); this.updatesLock.writeLock().unlock();
} }
@ -1767,9 +1801,16 @@ public class HRegion implements HeapSize { // , Writable{
", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
status.setStatus(s); status.setStatus(s);
if (LOG.isTraceEnabled()) LOG.trace(s); if (LOG.isTraceEnabled()) LOG.trace(s);
// sync unflushed WAL changes when deferred log sync is enabled // sync unflushed WAL changes
// see HBASE-8208 for details // see HBASE-8208 for details
if (wal != null && !shouldSyncLog()) wal.sync(); if (wal != null) {
try {
wal.sync(); // ensure that flush marker is sync'ed
} catch (IOException ioe) {
LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: "
+ StringUtils.stringifyException(ioe));
}
}
// wait for all in-progress transactions to commit to HLog before // wait for all in-progress transactions to commit to HLog before
// we can start the flush. This prevents // we can start the flush. This prevents
@ -1806,16 +1847,27 @@ public class HRegion implements HeapSize { // , Writable{
// Switch snapshot (in memstore) -> new hfile (thus causing // Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek). // all the store scanners to reset/reseek).
Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have
// same order
for (StoreFlushContext flush : storeFlushCtxs) { for (StoreFlushContext flush : storeFlushCtxs) {
boolean needsCompaction = flush.commit(status); boolean needsCompaction = flush.commit(status);
if (needsCompaction) { if (needsCompaction) {
compactionRequested = true; compactionRequested = true;
} }
committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
} }
storeFlushCtxs.clear(); storeFlushCtxs.clear();
// Set down the memstore size by amount of flush. // Set down the memstore size by amount of flush.
this.addAndGetGlobalMemstoreSize(-totalFlushableSize); this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
if (wal != null) {
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, true);
}
} catch (Throwable t) { } catch (Throwable t) {
// An exception here means that the snapshot was not persisted. // An exception here means that the snapshot was not persisted.
// The hlog needs to be replayed so its content is restored to memstore. // The hlog needs to be replayed so its content is restored to memstore.
@ -1824,6 +1876,16 @@ public class HRegion implements HeapSize { // , Writable{
// exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
// all and sundry. // all and sundry.
if (wal != null) { if (wal != null) {
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushSeqId, committedFiles);
HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable ex) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
StringUtils.stringifyException(ex));
// ignore this since we will be aborting the RS with DSE.
}
wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
} }
DroppedSnapshotException dse = new DroppedSnapshotException("region: " + DroppedSnapshotException dse = new DroppedSnapshotException("region: " +

View File

@ -2035,6 +2035,7 @@ public class HStore implements Store {
private long cacheFlushSeqNum; private long cacheFlushSeqNum;
private MemStoreSnapshot snapshot; private MemStoreSnapshot snapshot;
private List<Path> tempFiles; private List<Path> tempFiles;
private List<Path> committedFiles;
private StoreFlusherImpl(long cacheFlushSeqNum) { private StoreFlusherImpl(long cacheFlushSeqNum) {
this.cacheFlushSeqNum = cacheFlushSeqNum; this.cacheFlushSeqNum = cacheFlushSeqNum;
@ -2047,6 +2048,7 @@ public class HStore implements Store {
@Override @Override
public void prepare() { public void prepare() {
this.snapshot = memstore.snapshot(); this.snapshot = memstore.snapshot();
committedFiles = new ArrayList<Path>(1);
} }
@Override @Override
@ -2079,14 +2081,20 @@ public class HStore implements Store {
} }
} }
if (HStore.this.getCoprocessorHost() != null) { for (StoreFile sf : storeFiles) {
for (StoreFile sf : storeFiles) { if (HStore.this.getCoprocessorHost() != null) {
HStore.this.getCoprocessorHost().postFlush(HStore.this, sf); HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
} }
committedFiles.add(sf.getPath());
} }
// Add new file to store files. Clear snapshot too while we have the Store write lock. // Add new file to store files. Clear snapshot too while we have the Store write lock.
return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
} }
@Override
public List<Path> getCommittedFiles() {
return committedFiles;
}
} }
@Override @Override

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
/** /**
@ -61,4 +63,10 @@ interface StoreFlushContext {
* @throws IOException * @throws IOException
*/ */
boolean commit(MonitoredTask status) throws IOException; boolean commit(MonitoredTask status) throws IOException;
/**
* Returns the newly committed files from the flush. Called only if commit returns true
* @return a list of Paths for new files
*/
List<Path> getCommittedFiles();
} }

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
@ -268,4 +269,19 @@ public class HLogUtil {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
} }
} }
/**
* Write a flush marker indicating a start / abort or a complete of a region flush
*/
public static long writeFlushMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException {
TableName tn = TableName.valueOf(f.getTableName().toByteArray());
HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
long trx = log.appendNoSync(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false, null);
if (sync) log.sync(trx);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
return trx;
}
} }

View File

@ -36,8 +36,10 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
@ -83,6 +85,8 @@ public class WALEdit implements Writable, HeapSize {
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
static final byte [] METAROW = Bytes.toBytes("METAROW"); static final byte [] METAROW = Bytes.toBytes("METAROW");
static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION"); static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH");
private final int VERSION_2 = -1; private final int VERSION_2 = -1;
private final boolean isReplay; private final boolean isReplay;
@ -112,6 +116,10 @@ public class WALEdit implements Writable, HeapSize {
return Bytes.equals(METAFAMILY, f); return Bytes.equals(METAFAMILY, f);
} }
public static boolean isMetaEditFamily(Cell cell) {
return CellUtil.matchingFamily(cell, METAFAMILY);
}
/** /**
* @return True when current WALEdit is created by log replay. Replication skips WALEdits from * @return True when current WALEdit is created by log replay. Replication skips WALEdits from
* replay. * replay.
@ -256,6 +264,19 @@ public class WALEdit implements Writable, HeapSize {
return sb.toString(); return sb.toString();
} }
public static WALEdit createFlushWALEdit(HRegionInfo hri, FlushDescriptor f) {
KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, FLUSH,
EnvironmentEdgeManager.currentTimeMillis(), f.toByteArray());
return new WALEdit().add(kv);
}
public static FlushDescriptor getFlushDescriptor(Cell cell) throws IOException {
if (CellUtil.matchingColumn(cell, METAFAMILY, FLUSH)) {
return FlushDescriptor.parseFrom(cell.getValue());
}
return null;
}
/** /**
* Create a compacion WALEdit * Create a compacion WALEdit
* @param c * @param c
@ -264,7 +285,7 @@ public class WALEdit implements Writable, HeapSize {
public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) { public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) {
byte [] pbbytes = c.toByteArray(); byte [] pbbytes = c.toByteArray();
KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION, KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION,
System.currentTimeMillis(), pbbytes); EnvironmentEdgeManager.currentTimeMillis(), pbbytes);
return new WALEdit().add(kv); //replication scope null so that this won't be replicated return new WALEdit().add(kv); //replication scope null so that this won't be replicated
} }

View File

@ -35,10 +35,12 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -111,6 +113,9 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
@ -136,6 +141,7 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -786,6 +792,228 @@ public class TestHRegion {
} }
} }
@Test
public void testFlushMarkers() throws Exception {
// tests that flush markers are written to WAL and handled at recovered edits
String method = name.getMethodName();
TableName tableName = TableName.valueOf(method);
byte[] family = Bytes.toBytes("family");
Path logDir = TEST_UTIL.getDataTestDirOnTestFS("testRecoveredEditsIgnoreFlushMarkers.log");
HLog hlog = HLogFactory.createHLog(FILESYSTEM, logDir, UUID.randomUUID().toString(),
TEST_UTIL.getConfiguration());
this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family);
try {
Path regiondir = region.getRegionFileSystem().getRegionDir();
FileSystem fs = region.getRegionFileSystem().getFileSystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
long maxSeqId = 3;
long minSeqId = 0;
for (long i = minSeqId; i < maxSeqId; i++) {
Put put = new Put(Bytes.toBytes(i));
put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
region.put(put);
region.flushcache();
}
// this will create a region with 3 files from flush
assertEquals(3, region.getStore(family).getStorefilesCount());
List<String> storeFiles = new ArrayList<String>(3);
for (StoreFile sf : region.getStore(family).getStorefiles()) {
storeFiles.add(sf.getPath().getName());
}
// now verify that the flush markers are written
hlog.close();
HLog.Reader reader = HLogFactory.createReader(fs,
fs.listStatus(fs.listStatus(logDir)[0].getPath())[0].getPath(),
TEST_UTIL.getConfiguration());
List<HLog.Entry> flushDescriptors = new ArrayList<HLog.Entry>();
long lastFlushSeqId = -1;
while (true) {
HLog.Entry entry = reader.next();
if (entry == null) {
break;
}
Cell cell = entry.getEdit().getKeyValues().get(0);
if (WALEdit.isMetaEditFamily(cell)) {
FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
assertNotNull(flushDesc);
assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
if (flushDesc.getAction() == FlushAction.START_FLUSH) {
assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
} else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
}
lastFlushSeqId = flushDesc.getFlushSequenceNumber();
assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
assertEquals("family", storeFlushDesc.getStoreHomeDir());
if (flushDesc.getAction() == FlushAction.START_FLUSH) {
assertEquals(0, storeFlushDesc.getFlushOutputCount());
} else {
assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
}
flushDescriptors.add(entry);
}
}
assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
// now write those markers to the recovered edits again.
Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
fs.create(recoveredEdits);
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
for (HLog.Entry entry : flushDescriptors) {
writer.append(entry);
}
writer.close();
// close the region now, and reopen again
region.close();
region = HRegion.openHRegion(region, null);
// now check whether we have can read back the data from region
for (long i = minSeqId; i < maxSeqId; i++) {
Get get = new Get(Bytes.toBytes(i));
Result result = region.get(get);
byte[] value = result.getValue(family, Bytes.toBytes(i));
assertArrayEquals(Bytes.toBytes(i), value);
}
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
class IsFlushWALMarker extends ArgumentMatcher<WALEdit> {
volatile FlushAction[] actions;
public IsFlushWALMarker(FlushAction... actions) {
this.actions = actions;
}
@Override
public boolean matches(Object edit) {
List<KeyValue> kvs = ((WALEdit)edit).getKeyValues();
if (kvs.isEmpty()) {
return false;
}
if (WALEdit.isMetaEditFamily(kvs.get(0))) {
FlushDescriptor desc = null;
try {
desc = WALEdit.getFlushDescriptor(kvs.get(0));
} catch (IOException e) {
LOG.warn(e);
return false;
}
if (desc != null) {
for (FlushAction action : actions) {
if (desc.getAction() == action) {
return true;
}
}
}
}
return false;
}
public IsFlushWALMarker set(FlushAction... actions) {
this.actions = actions;
return this;
}
}
@Test
@SuppressWarnings("unchecked")
public void testFlushMarkersWALFail() throws Exception {
// test the cases where the WAL append for flush markers fail.
String method = name.getMethodName();
TableName tableName = TableName.valueOf(method);
byte[] family = Bytes.toBytes("family");
// spy an actual WAL implementation to throw exception (was not able to mock)
Path logDir = TEST_UTIL.getDataTestDirOnTestFS("testRecoveredEditsIgnoreFlushMarkers.log");
HLog hlog = spy(HLogFactory.createHLog(FILESYSTEM, logDir, UUID.randomUUID().toString(),
TEST_UTIL.getConfiguration()));
this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family);
try {
int i = 0;
Put put = new Put(Bytes.toBytes(i));
put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
region.put(put);
// 1. Test case where START_FLUSH throws exception
IsFlushWALMarker isFlushWALMarker = new IsFlushWALMarker(FlushAction.START_FLUSH);
// throw exceptions if the WalEdit is a start flush action
when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
(WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(),
(List<KeyValue>)any()))
.thenThrow(new IOException("Fail to append flush marker"));
// start cache flush will throw exception
try {
region.flushcache();
fail("This should have thrown exception");
} catch (DroppedSnapshotException unexpected) {
// this should not be a dropped snapshot exception. Meaning that RS will not abort
throw unexpected;
} catch (IOException expected) {
// expected
}
// 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
isFlushWALMarker.set(FlushAction.COMMIT_FLUSH);
try {
region.flushcache();
fail("This should have thrown exception");
} catch (DroppedSnapshotException expected) {
// we expect this exception, since we were able to write the snapshot, but failed to
// write the flush marker to WAL
} catch (IOException unexpected) {
throw unexpected;
}
region.close();
this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family);
region.put(put);
// 3. Test case where ABORT_FLUSH will throw exception.
// Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
// DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort
isFlushWALMarker.set(FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH);
try {
region.flushcache();
fail("This should have thrown exception");
} catch (DroppedSnapshotException expected) {
// we expect this exception, since we were able to write the snapshot, but failed to
// write the flush marker to WAL
} catch (IOException unexpected) {
throw unexpected;
}
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
@Test @Test
public void testGetWhileRegionClose() throws IOException { public void testGetWhileRegionClose() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = TableName.valueOf(name.getMethodName());