HBASE-11512 Write region open/close events to WAL

This commit is contained in:
Enis Soztutar 2014-08-19 18:45:21 -07:00
parent aeecd20373
commit d44e7df5dc
9 changed files with 2755 additions and 29 deletions

View File

@ -120,6 +120,8 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.access.TablePermission;
import org.apache.hadoop.hbase.security.access.UserPermission;
@ -2529,6 +2531,30 @@ public final class ProtobufUtil {
return desc.build();
}
public static RegionEventDescriptor toRegionEventDescriptor(
EventType eventType, HRegionInfo hri, long seqId, ServerName server,
Map<byte[], List<Path>> storeFiles) {
RegionEventDescriptor.Builder desc = RegionEventDescriptor.newBuilder()
.setEventType(eventType)
.setTableName(ByteStringer.wrap(hri.getTable().getName()))
.setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
.setLogSequenceNumber(seqId)
.setServer(toServerName(server));
for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
RegionEventDescriptor.StoreDescriptor.Builder builder
= RegionEventDescriptor.StoreDescriptor.newBuilder()
.setFamilyName(ByteStringer.wrap(entry.getKey()))
.setStoreHomeDir(Bytes.toString(entry.getKey()));
for (Path path : entry.getValue()) {
builder.addStoreFile(path.getName());
}
desc.addStores(builder);
}
return desc.build();
}
/**
* Return short version of Message toString'd, shorter than TextFormat#shortDebugString.
* Tries to NOT print out data both because it can be big but also so we do not have data in our

View File

@ -120,6 +120,29 @@ message FlushDescriptor {
repeated StoreFlushDescriptor store_flushes = 5;
}
/**
* Special WAL entry to hold all related to a region event (open/close).
*/
message RegionEventDescriptor {
enum EventType {
REGION_OPEN = 0;
REGION_CLOSE = 1;
}
message StoreDescriptor {
required bytes family_name = 1;
required string store_home_dir = 2; //relative to region dir
repeated string store_file = 3; // relative to store dir
}
required EventType event_type = 1;
required bytes table_name = 2;
required bytes encoded_region_name = 3;
optional uint64 log_sequence_number = 4;
repeated StoreDescriptor stores = 5;
optional ServerName server = 6; // Server who opened the region
}
/**
* A trailer that is appended to the end of a properly closed HLog WAL file.
* If missing, this is either a legacy or a corrupted WAL file.

View File

@ -120,6 +120,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@ -725,6 +726,8 @@ public class HRegion implements HeapSize { // , Writable{
status.setStatus("Writing region info on filesystem");
fs.checkRegionInfoOnFilesystem();
// Initialize all the HStores
status.setStatus("Initializing all the Stores");
long maxSeqId = initializeRegionStores(reporter, status);
@ -761,6 +764,7 @@ public class HRegion implements HeapSize { // , Writable{
// overlaps used sequence numbers
nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million
}
LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
"; next sequenceid=" + nextSeqid);
@ -850,6 +854,44 @@ public class HRegion implements HeapSize { // , Writable{
return maxSeqId;
}
private void writeRegionOpenMarker(HLog log, long openSeqId) throws IOException {
Map<byte[], List<Path>> storeFiles
= new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
Store store = entry.getValue();
ArrayList<Path> storeFileNames = new ArrayList<Path>();
for (StoreFile storeFile: store.getStorefiles()) {
storeFileNames.add(storeFile.getPath());
}
storeFiles.put(entry.getKey(), storeFileNames);
}
RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionOpenDesc,
getSequenceId());
}
private void writeRegionCloseMarker(HLog log) throws IOException {
Map<byte[], List<Path>> storeFiles
= new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Store> entry : getStores().entrySet()) {
Store store = entry.getValue();
ArrayList<Path> storeFileNames = new ArrayList<Path>();
for (StoreFile storeFile: store.getStorefiles()) {
storeFileNames.add(storeFile.getPath());
}
storeFiles.put(entry.getKey(), storeFileNames);
}
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
getRegionServerServices().getServerName(), storeFiles);
HLogUtil.writeRegionEventMarker(log, getTableDesc(), getRegionInfo(), regionEventDesc,
getSequenceId());
}
/**
* @return True if this region has references.
*/
@ -1227,6 +1269,12 @@ public class HRegion implements HeapSize { // , Writable{
storeCloserThreadPool.shutdownNow();
}
}
status.setStatus("Writing region close event to WAL");
if (!abort && log != null && getRegionServerServices() != null) {
writeRegionCloseMarker(log);
}
this.closed.set(true);
if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
if (coprocessorHost != null) {
@ -3550,6 +3598,7 @@ public class HRegion implements HeapSize { // , Writable{
}
return storeFileNames;
}
//////////////////////////////////////////////////////////////////////////////
// Support code
//////////////////////////////////////////////////////////////////////////////
@ -4601,6 +4650,9 @@ public class HRegion implements HeapSize { // , Writable{
this.openSeqNum = initialize(reporter);
this.setSequenceId(openSeqNum);
if (log != null && getRegionServerServices() != null) {
writeRegionOpenMarker(log, openSeqNum);
}
return this;
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.util.FSUtils;
import com.google.protobuf.TextFormat;
@ -284,4 +285,20 @@ public class HLogUtil {
}
return trx;
}
/**
* Write a region open marker indicating that the region is opened
*/
public static long writeRegionEventMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
final RegionEventDescriptor r, AtomicLong sequenceId) throws IOException {
TableName tn = TableName.valueOf(r.getTableName().toByteArray());
HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
long trx = log.appendNoSync(htd, info, key, WALEdit.createRegionEventWALEdit(info, r),
sequenceId, false, null);
log.sync(trx);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
return trx;
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -86,6 +87,7 @@ public class WALEdit implements Writable, HeapSize {
static final byte [] METAROW = Bytes.toBytes("METAROW");
static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH");
static final byte [] REGION_EVENT = Bytes.toBytes("HBASE::REGION_EVENT");
private final int VERSION_2 = -1;
private final boolean isReplay;
@ -277,6 +279,20 @@ public class WALEdit implements Writable, HeapSize {
return null;
}
public static WALEdit createRegionEventWALEdit(HRegionInfo hri,
RegionEventDescriptor regionEventDesc) {
KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, REGION_EVENT,
EnvironmentEdgeManager.currentTimeMillis(), regionEventDesc.toByteArray());
return new WALEdit().add(kv);
}
public static RegionEventDescriptor getRegionEventDescriptor(Cell cell) throws IOException {
if (CellUtil.matchingColumn(cell, METAFAMILY, REGION_EVENT)) {
return RegionEventDescriptor.parseFrom(cell.getValue());
}
return null;
}
/**
* Create a compacion WALEdit
* @param c

View File

@ -955,6 +955,7 @@ public class TestDistributedLogSplitting {
"table", "family", NUM_LOG_LINES, 100);
new Thread() {
@Override
public void run() {
waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
for (RegionServerThread rst : rsts) {
@ -1497,9 +1498,12 @@ public class TestDistributedLogSplitting {
throws IOException {
int count = 0;
HLog.Reader in = HLogFactory.createReader(fs, log, conf);
while (in.next() != null) {
HLog.Entry e;
while ((e = in.next()) != null) {
if (!WALEdit.isMetaEditFamily(e.getEdit().getKeyValues().get(0))) {
count++;
}
}
return count;
}

View File

@ -34,8 +34,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@ -86,6 +88,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
@ -114,8 +117,10 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
@ -141,6 +146,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
@ -5481,6 +5487,136 @@ public class TestHRegion {
this.region = null;
}
@Test
@SuppressWarnings("unchecked")
public void testOpenRegionWrittenToWAL() throws Exception {
final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWAL", 100, 42);
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
HTableDescriptor htd
= new HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWAL"));
htd.addFamily(new HColumnDescriptor(fam1));
htd.addFamily(new HColumnDescriptor(fam2));
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
// open the region w/o rss and log and flush some files
HRegion region =
HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
.getConfiguration(), htd);
assertNotNull(region);
// create a file in fam1 for the region before opening in OpenRegionHandler
region.put(new Put(Bytes.toBytes("a")).add(fam1, fam1, fam1));
region.flushcache();
region.close();
ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
// capture appendNoSync() calls
HLog log = mock(HLog.class);
when(rss.getWAL((HRegionInfo) any())).thenReturn(log);
try {
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
TEST_UTIL.getConfiguration(), rss, null);
verify(log, times(1)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any()
, editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<KeyValue>)any());
WALEdit edit = editCaptor.getValue();
assertNotNull(edit);
assertNotNull(edit.getKeyValues());
assertEquals(1, edit.getKeyValues().size());
RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getKeyValues().get(0));
assertNotNull(desc);
LOG.info("RegionEventDescriptor from WAL: " + desc);
assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getName()));
assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
hri.getEncodedNameAsBytes()));
assertTrue(desc.getLogSequenceNumber() > 0);
assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
assertEquals(2, desc.getStoresCount());
StoreDescriptor store = desc.getStores(0);
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
assertEquals(1, store.getStoreFileCount()); // 1store file
assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
store = desc.getStores(1);
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
assertEquals(0, store.getStoreFileCount()); // no store files
} finally {
HRegion.closeHRegion(region);
}
}
@Test
@SuppressWarnings("unchecked")
public void testCloseRegionWrittenToWAL() throws Exception {
final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
HTableDescriptor htd
= new HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWAL"));
htd.addFamily(new HColumnDescriptor(fam1));
htd.addFamily(new HColumnDescriptor(fam2));
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
// capture appendNoSync() calls
HLog log = mock(HLog.class);
when(rss.getWAL((HRegionInfo) any())).thenReturn(log);
// open a region first so that it can be closed later
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
TEST_UTIL.getConfiguration(), rss, null);
// close the region
region.close(false);
// 2 times, one for region open, the other close region
verify(log, times(2)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<KeyValue>)any());
WALEdit edit = editCaptor.getAllValues().get(1);
assertNotNull(edit);
assertNotNull(edit.getKeyValues());
assertEquals(1, edit.getKeyValues().size());
RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getKeyValues().get(0));
assertNotNull(desc);
LOG.info("RegionEventDescriptor from WAL: " + desc);
assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getName()));
assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
hri.getEncodedNameAsBytes()));
assertTrue(desc.getLogSequenceNumber() > 0);
assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
assertEquals(2, desc.getStoresCount());
StoreDescriptor store = desc.getStores(0);
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
assertEquals(0, store.getStoreFileCount()); // no store files
store = desc.getStores(1);
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
assertEquals(0, store.getStoreFileCount()); // no store files
}
private static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
@ -354,6 +355,7 @@ public class TestWALReplay {
User user = HBaseTestingUtility.getDifferentUser(newConf,
tableName.getNameAsString());
user.runAs(new PrivilegedExceptionAction() {
@Override
public Object run() throws Exception {
runWALSplit(newConf);
HLog wal2 = createWAL(newConf);
@ -425,6 +427,7 @@ public class TestWALReplay {
User user = HBaseTestingUtility.getDifferentUser(newConf,
tableName.getNameAsString());
user.runAs(new PrivilegedExceptionAction() {
@Override
public Object run() throws Exception {
runWALSplit(newConf);
HLog wal2 = createWAL(newConf);
@ -518,6 +521,7 @@ public class TestWALReplay {
User user = HBaseTestingUtility.getDifferentUser(newConf,
tableName.getNameAsString());
user.runAs(new PrivilegedExceptionAction() {
@Override
public Object run() throws Exception {
runWALSplit(newConf);
FileSystem newFS = FileSystem.get(newConf);
@ -669,6 +673,7 @@ public class TestWALReplay {
HLog wal = createWAL(this.conf);
RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
Mockito.doReturn(false).when(rsServices).isAborted();
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
Configuration customConf = new Configuration(this.conf);
customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
CustomStoreFlusher.class.getName());
@ -802,6 +807,7 @@ public class TestWALReplay {
User user = HBaseTestingUtility.getDifferentUser(newConf,
".replay.wal.secondtime");
user.runAs(new PrivilegedExceptionAction() {
@Override
public Object run() throws Exception {
runWALSplit(newConf);
FileSystem newFS = FileSystem.get(newConf);
@ -813,6 +819,7 @@ public class TestWALReplay {
try {
final HRegion region =
new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
@Override
protected FlushResult internalFlushcache(
final HLog wal, final long myseqid, MonitoredTask status)
throws IOException {