HBASE-15424 Add bulk load hfile-refs for replication in ZK after the event is appended in the WAL

This commit is contained in:
Ashish Singhi 2016-04-01 15:55:08 +05:30
parent d7d12aedd4
commit bcbef7b401
7 changed files with 53 additions and 14 deletions

View File

@ -1399,14 +1399,14 @@ public class FSHLog implements WAL {
}
}
private long postAppend(final Entry e, final long elapsedTime) {
private long postAppend(final Entry e, final long elapsedTime) throws IOException {
long len = 0;
if (!listeners.isEmpty()) {
for (Cell cell : e.getEdit().getCells()) {
len += CellUtil.estimatedSerializedSizeOf(cell);
}
for (WALActionsListener listener : listeners) {
listener.postAppend(len, elapsedTime);
listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
}
}
return len;

View File

@ -20,9 +20,13 @@
package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.util.StringUtils;
@ -51,7 +55,8 @@ public class MetricsWAL extends WALActionsListener.Base {
}
@Override
public void postAppend(final long size, final long time) {
public void postAppend(final long size, final long time, final WALKey logkey,
final WALEdit logEdit) throws IOException {
source.incrementAppendCount();
source.incrementAppendTime(time);
source.incrementAppendSize(size);

View File

@ -101,8 +101,12 @@ public interface WALActionsListener {
* TODO: Combine this with above.
* @param entryLen approx length of cells in this append.
* @param elapsedTimeMillis elapsed time in milliseconds.
* @param logKey A WAL key
* @param logEdit A WAL edit containing list of cells.
* @throws IOException if any network or I/O occurred
*/
void postAppend(final long entryLen, final long elapsedTimeMillis);
void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey,
final WALEdit logEdit) throws IOException;
/**
* For notification post writer sync. Used by metrics system at least.
@ -140,7 +144,9 @@ public interface WALActionsListener {
}
@Override
public void postAppend(final long entryLen, final long elapsedTimeMillis) {}
public void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey,
final WALEdit logEdit) throws IOException {
}
@Override
public void postSync(final long timeInNanos, final int handlerSyncs) {}

View File

@ -262,6 +262,36 @@ public class Replication extends WALActionsListener.Base implements
scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
}
@Override
public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey,
final WALEdit edit) throws IOException {
NavigableMap<byte[], Integer> scopes = logKey.getScopes();
if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) {
TableName tableName = logKey.getTablename();
for (Cell c : edit.getCells()) {
// Only check for bulk load events
if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = null;
try {
bld = WALEdit.getBulkLoadDescriptor(c);
} catch (IOException e) {
LOG.error("Failed to get bulk load events information from the wal file.", e);
throw e;
}
for (StoreDescriptor s : bld.getStoresList()) {
byte[] fam = s.getFamilyName().toByteArray();
// We have already scoped the entries as part
// WALActionsListener#visitLogEntryBeforeWrite notification
if (scopes.containsKey(fam)) {
addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s);
}
}
}
}
}
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
@ -314,10 +344,7 @@ public class Replication extends WALActionsListener.Base implements
int scope = htd.getFamily(family).getScope();
if (scope != REPLICATION_SCOPE_LOCAL) {
scopes.put(family, scope);
addHFileRefsToQueue(replicationManager, tableName, family, s);
}
} else {
addHFileRefsToQueue(replicationManager, tableName, family, s);
}
}
} catch (IOException e) {
@ -331,7 +358,7 @@ public class Replication extends WALActionsListener.Base implements
try {
replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
} catch (ReplicationException e) {
LOG.error("Failed to create hfile references in ZK.", e);
LOG.error("Failed to add hfile references in the replication queue.", e);
throw new IOException(e);
}
}

View File

@ -155,7 +155,7 @@ class DisabledWALProvider implements WALProvider {
@Override
public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
boolean inMemstore) {
boolean inMemstore) throws IOException {
if (!this.listeners.isEmpty()) {
final long start = System.nanoTime();
long len = 0;
@ -164,7 +164,7 @@ class DisabledWALProvider implements WALProvider {
}
final long elapsed = (System.nanoTime() - start)/1000000l;
for (WALActionsListener listener : this.listeners) {
listener.postAppend(len, elapsed);
listener.postAppend(len, elapsed, key, edits);
}
}
return -1;

View File

@ -57,8 +57,8 @@ public class TestMetricsWAL {
public void testWalWrittenInBytes() throws Exception {
MetricsWALSource source = mock(MetricsWALSourceImpl.class);
MetricsWAL metricsWAL = new MetricsWAL(source);
metricsWAL.postAppend(100, 900);
metricsWAL.postAppend(200, 2000);
metricsWAL.postAppend(100, 900, null, null);
metricsWAL.postAppend(200, 2000, null, null);
verify(source, times(1)).incrementWrittenBytes(100);
verify(source, times(1)).incrementWrittenBytes(200);
}

View File

@ -514,7 +514,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
}
@Override
public void postAppend(final long size, final long elapsedTime) {
public void postAppend(final long size, final long elapsedTime, final WALKey logkey,
final WALEdit logEdit) {
appendMeter.mark(size);
}
});