HBASE-3033 [replication] ReplicationSink.replicateEntries improvements

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1001061 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2010-09-24 20:37:44 +00:00
parent b05d9ac937
commit d4e9b8f4d9
2 changed files with 17 additions and 14 deletions

View File

@ -932,6 +932,7 @@ Release 0.21.0 - Unreleased
HBASE-3017 More log pruning
HBASE-3022 Change format of enum messages in o.a.h.h.executor package
HBASE-3001 Ship dependency jars to the cluster for all jobs
HBASE-3033 [replication] ReplicationSink.replicateEntries improvements
NEW FEATURES
HBASE-1961 HBase EC2 scripts

View File

@ -22,7 +22,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
@ -37,6 +36,8 @@ import org.apache.hadoop.hbase.Stoppable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -88,7 +89,7 @@ public class ReplicationSink {
* @param entries
* @throws IOException
*/
public synchronized void replicateEntries(HLog.Entry[] entries)
public void replicateEntries(HLog.Entry[] entries)
throws IOException {
if (entries.length == 0) {
return;
@ -97,8 +98,9 @@ public class ReplicationSink {
// to the same table.
try {
long totalReplicated = 0;
byte[] lastTable = HConstants.EMPTY_BYTE_ARRAY;
List<Put> puts = new ArrayList<Put>();
// Map of table => list of puts, we only want to flushCommits once per
// invocation of this method per table.
Map<byte[], List<Put>> puts = new TreeMap<byte[], List<Put>>(Bytes.BYTES_COMPARATOR);
for (HLog.Entry entry : entries) {
WALEdit edit = entry.getEdit();
List<KeyValue> kvs = edit.getKeyValues();
@ -115,9 +117,11 @@ public class ReplicationSink {
}
delete(entry.getKey().getTablename(), delete);
} else {
// Switching table, flush
if (!Bytes.equals(lastTable, entry.getKey().getTablename())) {
put(lastTable, puts);
byte[] table = entry.getKey().getTablename();
List<Put> tableList = puts.get(table);
if (tableList == null) {
tableList = new ArrayList<Put>();
puts.put(table, tableList);
}
// With mini-batching, we need to expect multiple rows per edit
byte[] lastKey = kvs.get(0).getRow();
@ -125,18 +129,19 @@ public class ReplicationSink {
kvs.get(0).getTimestamp());
for (KeyValue kv : kvs) {
if (!Bytes.equals(lastKey, kv.getRow())) {
puts.add(put);
tableList.add(put);
put = new Put(kv.getRow(), kv.getTimestamp());
}
put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
lastKey = kv.getRow();
}
puts.add(put);
lastTable = entry.getKey().getTablename();
tableList.add(put);
}
totalReplicated++;
}
put(lastTable, puts);
for(byte [] table : puts.keySet()) {
put(table, puts.get(table));
}
this.metrics.setAgeOfLastAppliedOp(
entries[entries.length-1].getKey().getWriteTime());
this.metrics.appliedBatchesRate.inc(1);
@ -175,8 +180,6 @@ public class ReplicationSink {
table = this.pool.getTable(tableName);
table.put(puts);
this.metrics.appliedOpsRate.inc(puts.size());
this.pool.putTable(table);
puts.clear();
} finally {
if (table != null) {
this.pool.putTable(table);
@ -196,7 +199,6 @@ public class ReplicationSink {
table = this.pool.getTable(tableName);
table.delete(delete);
this.metrics.appliedOpsRate.inc(1);
this.pool.putTable(table);
} finally {
if (table != null) {
this.pool.putTable(table);