HBASE-9158 Serious bug in cyclic replication
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1512089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eb197cbf04
commit
3bc9e2c95c
|
@ -20,14 +20,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -45,14 +44,12 @@ import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
|
||||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||||
import org.apache.hadoop.hbase.client.Mutation;
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Row;
|
import org.apache.hadoop.hbase.client.Row;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is responsible for replicating the edits coming
|
* This class is responsible for replicating the edits coming
|
||||||
|
@ -75,7 +72,6 @@ public class ReplicationSink {
|
||||||
// Name of the HDFS directory that contains the temporary rep logs
|
// Name of the HDFS directory that contains the temporary rep logs
|
||||||
public static final String REPLICATION_LOG_DIR = ".replogs";
|
public static final String REPLICATION_LOG_DIR = ".replogs";
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final ExecutorService sharedThreadPool;
|
|
||||||
private final HConnection sharedHtableCon;
|
private final HConnection sharedHtableCon;
|
||||||
private final MetricsSink metrics;
|
private final MetricsSink metrics;
|
||||||
private final AtomicLong totalReplicatedEdits = new AtomicLong();
|
private final AtomicLong totalReplicatedEdits = new AtomicLong();
|
||||||
|
@ -93,11 +89,6 @@ public class ReplicationSink {
|
||||||
decorateConf();
|
decorateConf();
|
||||||
this.metrics = new MetricsSink();
|
this.metrics = new MetricsSink();
|
||||||
this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
|
this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
|
||||||
this.sharedThreadPool = new ThreadPoolExecutor(1,
|
|
||||||
conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE),
|
|
||||||
conf.getLong("hbase.htable.threads.keepalivetime", 60), TimeUnit.SECONDS,
|
|
||||||
new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-repl"));
|
|
||||||
((ThreadPoolExecutor) this.sharedThreadPool).allowCoreThreadTimeOut(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -125,10 +116,9 @@ public class ReplicationSink {
|
||||||
// to the same table.
|
// to the same table.
|
||||||
try {
|
try {
|
||||||
long totalReplicated = 0;
|
long totalReplicated = 0;
|
||||||
// Map of table => list of Rows, we only want to flushCommits once per
|
// Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
|
||||||
// invocation of this method per table.
|
// invocation of this method per table and cluster id.
|
||||||
Map<TableName, List<Row>> rows =
|
Map<TableName, Map<UUID,List<Row>>> rowMap = new TreeMap<TableName, Map<UUID,List<Row>>>();
|
||||||
new TreeMap<TableName, List<Row>>();
|
|
||||||
for (WALEntry entry : entries) {
|
for (WALEntry entry : entries) {
|
||||||
TableName table =
|
TableName table =
|
||||||
TableName.valueOf(entry.getKey().getTableName().toByteArray());
|
TableName.valueOf(entry.getKey().getTableName().toByteArray());
|
||||||
|
@ -148,7 +138,7 @@ public class ReplicationSink {
|
||||||
new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
|
new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
|
||||||
new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
|
||||||
m.setClusterId(uuid);
|
m.setClusterId(uuid);
|
||||||
addToMultiMap(rows, table, m);
|
addToHashMultiMap(rowMap, table, uuid, m);
|
||||||
}
|
}
|
||||||
if (CellUtil.isDelete(cell)) {
|
if (CellUtil.isDelete(cell)) {
|
||||||
((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
|
((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
|
||||||
|
@ -159,8 +149,8 @@ public class ReplicationSink {
|
||||||
}
|
}
|
||||||
totalReplicated++;
|
totalReplicated++;
|
||||||
}
|
}
|
||||||
for (Entry<TableName, List<Row>> entry : rows.entrySet()) {
|
for (Entry<TableName, Map<UUID,List<Row>>> entry : rowMap.entrySet()) {
|
||||||
batch(entry.getKey(), entry.getValue());
|
batch(entry.getKey(), entry.getValue().values());
|
||||||
}
|
}
|
||||||
int size = entries.size();
|
int size = entries.size();
|
||||||
this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
|
this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
|
||||||
|
@ -190,15 +180,21 @@ public class ReplicationSink {
|
||||||
* Simple helper to a map from key to (a list of) values
|
* Simple helper to a map from key to (a list of) values
|
||||||
* TODO: Make a general utility method
|
* TODO: Make a general utility method
|
||||||
* @param map
|
* @param map
|
||||||
* @param key
|
* @param key1
|
||||||
|
* @param key2
|
||||||
* @param value
|
* @param value
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
private <K, V> List<V> addToMultiMap(Map<K, List<V>> map, K key, V value) {
|
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
|
||||||
List<V> values = map.get(key);
|
Map<K2,List<V>> innerMap = map.get(key1);
|
||||||
|
if (innerMap == null) {
|
||||||
|
innerMap = new HashMap<K2, List<V>>();
|
||||||
|
map.put(key1, innerMap);
|
||||||
|
}
|
||||||
|
List<V> values = innerMap.get(key2);
|
||||||
if (values == null) {
|
if (values == null) {
|
||||||
values = new ArrayList<V>();
|
values = new ArrayList<V>();
|
||||||
map.put(key, values);
|
innerMap.put(key2, values);
|
||||||
}
|
}
|
||||||
values.add(value);
|
values.add(value);
|
||||||
return values;
|
return values;
|
||||||
|
@ -208,15 +204,6 @@ public class ReplicationSink {
|
||||||
* stop the thread pool executor. It is called when the regionserver is stopped.
|
* stop the thread pool executor. It is called when the regionserver is stopped.
|
||||||
*/
|
*/
|
||||||
public void stopReplicationSinkServices() {
|
public void stopReplicationSinkServices() {
|
||||||
try {
|
|
||||||
this.sharedThreadPool.shutdown();
|
|
||||||
if (!this.sharedThreadPool.awaitTermination(60000, TimeUnit.MILLISECONDS)) {
|
|
||||||
this.sharedThreadPool.shutdownNow();
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("Interrupted while closing the table pool", e); // ignoring it as we are closing.
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
this.sharedHtableCon.close();
|
this.sharedHtableCon.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -231,14 +218,16 @@ public class ReplicationSink {
|
||||||
* @param rows list of actions
|
* @param rows list of actions
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void batch(TableName tableName, List<Row> rows) throws IOException {
|
private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
|
||||||
if (rows.isEmpty()) {
|
if (allRows.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
HTableInterface table = null;
|
HTableInterface table = null;
|
||||||
try {
|
try {
|
||||||
table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool);
|
table = this.sharedHtableCon.getTable(tableName);
|
||||||
table.batch(rows);
|
for (List<Row> rows : allRows) {
|
||||||
|
table.batch(rows);
|
||||||
|
}
|
||||||
} catch (InterruptedException ix) {
|
} catch (InterruptedException ix) {
|
||||||
throw new IOException(ix);
|
throw new IOException(ix);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -73,6 +73,8 @@ public class TestMasterReplication {
|
||||||
private static final byte[] row = Bytes.toBytes("row");
|
private static final byte[] row = Bytes.toBytes("row");
|
||||||
private static final byte[] row1 = Bytes.toBytes("row1");
|
private static final byte[] row1 = Bytes.toBytes("row1");
|
||||||
private static final byte[] row2 = Bytes.toBytes("row2");
|
private static final byte[] row2 = Bytes.toBytes("row2");
|
||||||
|
private static final byte[] row3 = Bytes.toBytes("row3");
|
||||||
|
private static final byte[] row4 = Bytes.toBytes("row4");
|
||||||
private static final byte[] noRepfamName = Bytes.toBytes("norep");
|
private static final byte[] noRepfamName = Bytes.toBytes("norep");
|
||||||
|
|
||||||
private static final byte[] count = Bytes.toBytes("count");
|
private static final byte[] count = Bytes.toBytes("count");
|
||||||
|
@ -178,6 +180,21 @@ public class TestMasterReplication {
|
||||||
assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete));
|
assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete));
|
||||||
assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete));
|
assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete));
|
||||||
assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete));
|
assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete));
|
||||||
|
|
||||||
|
// Test HBASE-9158
|
||||||
|
admin2.disablePeer("1");
|
||||||
|
// we now have an edit that was replicated into cluster originating from cluster 1
|
||||||
|
putAndWait(row3, famName, htable1, htable2);
|
||||||
|
// now add a local edit to cluster 2
|
||||||
|
Put put = new Put(row4);
|
||||||
|
put.add(famName, row4, row4);
|
||||||
|
htable2.put(put);
|
||||||
|
// reenable replication from cluster 2 to cluster 3
|
||||||
|
admin2.enablePeer("1");
|
||||||
|
// without HBASE-9158 the edit for row4 would have been marked with cluster 1's id
|
||||||
|
// and hence not replicated to cluster 1
|
||||||
|
wait(row4, htable1);
|
||||||
|
|
||||||
utility3.shutdownMiniCluster();
|
utility3.shutdownMiniCluster();
|
||||||
utility2.shutdownMiniCluster();
|
utility2.shutdownMiniCluster();
|
||||||
utility1.shutdownMiniCluster();
|
utility1.shutdownMiniCluster();
|
||||||
|
@ -271,6 +288,10 @@ public class TestMasterReplication {
|
||||||
put.add(fam, row, row);
|
put.add(fam, row, row);
|
||||||
source.put(put);
|
source.put(put);
|
||||||
|
|
||||||
|
wait(row, target);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void wait(byte[] row, HTable target) throws Exception {
|
||||||
Get get = new Get(row);
|
Get get = new Get(row);
|
||||||
for (int i = 0; i < NB_RETRIES; i++) {
|
for (int i = 0; i < NB_RETRIES; i++) {
|
||||||
if (i==NB_RETRIES-1) {
|
if (i==NB_RETRIES-1) {
|
||||||
|
@ -284,7 +305,7 @@ public class TestMasterReplication {
|
||||||
assertArrayEquals(res.value(), row);
|
assertArrayEquals(res.value(), row);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue