HBASE-15424 Add bulk load hfile-refs for replication in ZK after the event is appended in the WAL

This commit is contained in:
Ashish Singhi 2016-04-01 15:40:36 +05:30
parent 5d79790c55
commit 25419d8b18
7 changed files with 58 additions and 33 deletions

View File

@ -840,14 +840,14 @@ public abstract class AbstractFSWAL<W> implements WAL {
return true;
}
private long postAppend(final Entry e, final long elapsedTime) {
private long postAppend(final Entry e, final long elapsedTime) throws IOException {
long len = 0;
if (!listeners.isEmpty()) {
for (Cell cell : e.getEdit().getCells()) {
len += CellUtil.estimatedSerializedSizeOf(cell);
}
for (WALActionsListener listener : listeners) {
listener.postAppend(len, elapsedTime);
listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
}
}
return len;

View File

@ -20,9 +20,13 @@
package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.util.StringUtils;
@ -51,7 +55,8 @@ public class MetricsWAL extends WALActionsListener.Base {
}
@Override
public void postAppend(final long size, final long time) {
public void postAppend(final long size, final long time, final WALKey logkey,
final WALEdit logEdit) throws IOException {
source.incrementAppendCount();
source.incrementAppendTime(time);
source.incrementAppendSize(size);

View File

@ -98,8 +98,12 @@ public interface WALActionsListener {
* TODO: Combine this with above.
* @param entryLen approx length of cells in this append.
* @param elapsedTimeMillis elapsed time in milliseconds.
* @param logKey A WAL key
* @param logEdit A WAL edit containing list of cells.
* @throws IOException if any network or I/O error occurred
*/
void postAppend(final long entryLen, final long elapsedTimeMillis);
void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey,
final WALEdit logEdit) throws IOException;
/**
* For notification post writer sync. Used by metrics system at least.
@ -136,7 +140,9 @@ public interface WALActionsListener {
}
@Override
public void postAppend(final long entryLen, final long elapsedTimeMillis) {}
public void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey,
final WALEdit logEdit) throws IOException {
}
@Override
public void postSync(final long timeInNanos, final int handlerSyncs) {}

View File

@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -256,6 +257,34 @@ public class Replication extends WALActionsListener.Base implements
scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
}
@Override
public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey,
final WALEdit edit) throws IOException {
NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) {
TableName tableName = logKey.getTablename();
for (Cell c : edit.getCells()) {
// Only check for bulk load events
if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = null;
try {
bld = WALEdit.getBulkLoadDescriptor(c);
} catch (IOException e) {
LOG.error("Failed to get bulk load events information from the wal file.", e);
throw e;
}
for (StoreDescriptor s : bld.getStoresList()) {
byte[] fam = s.getFamilyName().toByteArray();
if (scopes.containsKey(fam)) {
addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s);
}
}
}
}
}
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
@ -268,26 +297,9 @@ public class Replication extends WALActionsListener.Base implements
WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager)
throws IOException {
boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
byte[] family;
boolean foundOtherEdits = false;
for (Cell cell : logEdit.getCells()) {
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
for (StoreDescriptor s : bld.getStoresList()) {
family = s.getFamilyName().toByteArray();
addHFileRefsToQueue(replicationManager, logKey.getTablename(), family, s);
}
} catch (IOException e) {
LOG.error("Failed to get bulk load events information from the wal file.", e);
throw e;
}
} else {
// Skip the flush/compaction/region events
continue;
}
} else {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
foundOtherEdits = true;
}
}
@ -301,7 +313,7 @@ public class Replication extends WALActionsListener.Base implements
try {
replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
} catch (ReplicationException e) {
LOG.error("Failed to create hfile references in ZK.", e);
LOG.error("Failed to add hfile references in the replication queue.", e);
throw new IOException(e);
}
}

View File

@ -153,16 +153,17 @@ class DisabledWALProvider implements WALProvider {
}
@Override
public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) {
public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore)
throws IOException {
if (!this.listeners.isEmpty()) {
final long start = System.nanoTime();
long len = 0;
for (Cell cell : edits.getCells()) {
len += CellUtil.estimatedSerializedSizeOf(cell);
}
final long elapsed = (System.nanoTime() - start)/1000000l;
final long elapsed = (System.nanoTime() - start) / 1000000L;
for (WALActionsListener listener : this.listeners) {
listener.postAppend(len, elapsed);
listener.postAppend(len, elapsed, key, edits);
}
}
return -1;

View File

@ -60,10 +60,10 @@ public class TestMetricsWAL {
MetricsWALSource source = new MetricsWALSourceImpl();
MetricsWAL metricsWAL = new MetricsWAL(source);
// One not so slow append (< 1000)
metricsWAL.postAppend(1, 900);
metricsWAL.postAppend(1, 900, null, null);
// Two slow appends (> 1000)
metricsWAL.postAppend(1, 1010);
metricsWAL.postAppend(1, 2000);
metricsWAL.postAppend(1, 1010, null, null);
metricsWAL.postAppend(1, 2000, null, null);
assertEquals(2, source.getSlowAppendCount());
}
@ -71,8 +71,8 @@ public class TestMetricsWAL {
public void testWalWrittenInBytes() throws Exception {
MetricsWALSource source = mock(MetricsWALSourceImpl.class);
MetricsWAL metricsWAL = new MetricsWAL(source);
metricsWAL.postAppend(100, 900);
metricsWAL.postAppend(200, 2000);
metricsWAL.postAppend(100, 900, null, null);
metricsWAL.postAppend(200, 2000, null, null);
verify(source, times(1)).incrementWrittenBytes(100);
verify(source, times(1)).incrementWrittenBytes(200);
}

View File

@ -525,7 +525,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
}
@Override
public void postAppend(final long size, final long elapsedTime) {
public void postAppend(final long size, final long elapsedTime, final WALKey logkey,
final WALEdit logEdit) {
appendMeter.mark(size);
}
});