HBASE-17290 Potential loss of data for replication of bulk loaded hfiles
This commit is contained in:
parent
629b04f44f
commit
5f631b9653
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
|
|||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
|
@ -144,10 +145,11 @@ public interface ReplicationQueues {
|
|||
/**
|
||||
* Add new hfile references to the queue.
|
||||
* @param peerId peer cluster id to which the hfiles need to be replicated
|
||||
* @param files list of hfile references to be added
|
||||
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
|
||||
* will be added in the queue }
|
||||
* @throws ReplicationException if fails to add a hfile reference
|
||||
*/
|
||||
void addHFileRefs(String peerId, List<String> files) throws ReplicationException;
|
||||
void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove hfile references from the queue.
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
|
@ -319,16 +320,18 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addHFileRefs(String peerId, List<String> files) throws ReplicationException {
|
||||
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
|
||||
throws ReplicationException {
|
||||
String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
|
||||
boolean debugEnabled = LOG.isDebugEnabled();
|
||||
if (debugEnabled) {
|
||||
LOG.debug("Adding hfile references " + files + " in queue " + peerZnode);
|
||||
LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
|
||||
}
|
||||
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
|
||||
int size = files.size();
|
||||
int size = pairs.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)),
|
||||
listOfOps.add(ZKUtilOp.createAndFailSilent(
|
||||
ZKUtil.joinZNode(peerZnode, pairs.get(i).getSecond().getName()),
|
||||
HConstants.EMPTY_BYTE_ARRAY));
|
||||
}
|
||||
if (debugEnabled) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
|
@ -307,7 +308,8 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addHFileRefs(String peerId, List<String> files) throws ReplicationException {
|
||||
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
|
||||
throws ReplicationException {
|
||||
// TODO
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
|
|
@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFa
|
|||
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
|
||||
import org.apache.hadoop.hbase.security.Superusers;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
|
@ -524,6 +525,9 @@ public class HRegionServer extends HasThread implements
|
|||
checkCodecs(this.conf);
|
||||
this.userProvider = UserProvider.instantiate(conf);
|
||||
FSUtils.setupShortCircuitRead(this.conf);
|
||||
|
||||
Replication.decorateRegionServerConfiguration(this.conf);
|
||||
|
||||
// Disable usage of meta replicas in the regionserver
|
||||
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
|
||||
|
||||
|
|
|
@ -380,7 +380,7 @@ public class HFileReplicator {
|
|||
} catch (FileNotFoundException e1) {
|
||||
// This will mean that the hfile does not exists any where in source cluster FS. So we
|
||||
// cannot do anything here just log and continue.
|
||||
LOG.error("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
|
||||
LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
|
||||
+ ". Hence ignoring this hfile from replication..",
|
||||
e1);
|
||||
continue;
|
||||
|
|
|
@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -32,6 +31,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -44,8 +44,7 @@ import org.apache.hadoop.hbase.Server;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
|
||||
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
|
@ -236,34 +235,6 @@ public class Replication extends WALActionsListener.Base implements
|
|||
scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey,
|
||||
final WALEdit edit) throws IOException {
|
||||
NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
|
||||
if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) {
|
||||
TableName tableName = logKey.getTablename();
|
||||
for (Cell c : edit.getCells()) {
|
||||
// Only check for bulk load events
|
||||
if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) {
|
||||
BulkLoadDescriptor bld = null;
|
||||
try {
|
||||
bld = WALEdit.getBulkLoadDescriptor(c);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to get bulk load events information from the wal file.", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
for (StoreDescriptor s : bld.getStoresList()) {
|
||||
byte[] fam = s.getFamilyName().toByteArray();
|
||||
if (scopes.containsKey(fam)) {
|
||||
addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
|
||||
* compaction WAL edits and if the scope is local.
|
||||
|
@ -298,10 +269,10 @@ public class Replication extends WALActionsListener.Base implements
|
|||
}
|
||||
}
|
||||
|
||||
private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager,
|
||||
TableName tableName, byte[] family, StoreDescriptor s) throws IOException {
|
||||
void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
|
||||
throws IOException {
|
||||
try {
|
||||
replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
|
||||
this.replicationManager.addHFileRefs(tableName, family, pairs);
|
||||
} catch (ReplicationException e) {
|
||||
LOG.error("Failed to add hfile references in the replication queue.", e);
|
||||
throw new IOException(e);
|
||||
|
@ -337,6 +308,22 @@ public class Replication extends WALActionsListener.Base implements
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method modifies the region server's configuration in order to inject replication-related
|
||||
* features
|
||||
* @param conf region server configurations
|
||||
*/
|
||||
public static void decorateRegionServerConfiguration(Configuration conf) {
|
||||
if (isReplicationForBulkLoadDataEnabled(conf)) {
|
||||
String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, "");
|
||||
String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName();
|
||||
if (!plugins.contains(rsCoprocessorClass)) {
|
||||
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
|
||||
plugins + "," + rsCoprocessorClass);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Statistics thread. Periodically prints the cache statistics to the log.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* An Observer to facilitate replication operations
|
||||
*/
|
||||
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
public class ReplicationObserver extends BaseRegionObserver {
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationObserver.class);
|
||||
|
||||
@Override
|
||||
public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||
final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
|
||||
RegionCoprocessorEnvironment env = ctx.getEnvironment();
|
||||
Configuration c = env.getConfiguration();
|
||||
if (pairs == null || pairs.isEmpty()
|
||||
|| !c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
||||
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
|
||||
LOG.debug("Skipping recording bulk load entries in preCommitStoreFile for bulkloaded "
|
||||
+ "data replication.");
|
||||
return;
|
||||
}
|
||||
HRegionServer rs = (HRegionServer) env.getRegionServerServices();
|
||||
Replication rep = (Replication) rs.getReplicationSourceService();
|
||||
rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
|
||||
}
|
||||
}
|
|
@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
|
@ -253,7 +254,7 @@ public class ReplicationSource extends Thread
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
|
||||
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
|
||||
throws ReplicationException {
|
||||
String peerId = peerClusterZnode;
|
||||
if (peerId.contains("-")) {
|
||||
|
@ -266,8 +267,8 @@ public class ReplicationSource extends Thread
|
|||
List<String> tableCfs = tableCFMap.get(tableName);
|
||||
if (tableCFMap.containsKey(tableName)
|
||||
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
|
||||
this.replicationQueues.addHFileRefs(peerId, files);
|
||||
metrics.incrSizeOfHFileRefsQueue(files.size());
|
||||
this.replicationQueues.addHFileRefs(peerId, pairs);
|
||||
metrics.incrSizeOfHFileRefsQueue(pairs.size());
|
||||
} else {
|
||||
LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
|
||||
+ Bytes.toString(family) + " to peer id " + peerId);
|
||||
|
@ -275,8 +276,8 @@ public class ReplicationSource extends Thread
|
|||
} else {
|
||||
// user has explicitly not defined any table cfs for replication, means replicate all the
|
||||
// data
|
||||
this.replicationQueues.addHFileRefs(peerId, files);
|
||||
metrics.incrSizeOfHFileRefsQueue(files.size());
|
||||
this.replicationQueues.addHFileRefs(peerId, pairs);
|
||||
metrics.incrSizeOfHFileRefsQueue(pairs.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* Interface that defines a replication source
|
||||
|
@ -112,10 +113,11 @@ public interface ReplicationSourceInterface {
|
|||
* Add hfile names to the queue to be replicated.
|
||||
* @param tableName Name of the table these files belongs to
|
||||
* @param family Name of the family these files belong to
|
||||
* @param files files whose names needs to be added to the queue to be replicated
|
||||
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
|
||||
* will be added in the queue for replication}
|
||||
* @throws ReplicationException If failed to add hfile references
|
||||
*/
|
||||
void addHFileRefs(TableName tableName, byte[] family, List<String> files)
|
||||
void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
|
||||
throws ReplicationException;
|
||||
|
||||
}
|
||||
|
|
|
@ -846,10 +846,10 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
return stats.toString();
|
||||
}
|
||||
|
||||
public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
|
||||
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
|
||||
throws ReplicationException {
|
||||
for (ReplicationSourceInterface source : this.sources) {
|
||||
source.addHFileRefs(tableName, family, files);
|
||||
source.addHFileRefs(tableName, family, pairs);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
|
|||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -138,8 +139,8 @@ public class TestReplicationHFileCleaner {
|
|||
+ "for it in the queue.",
|
||||
cleaner.isFileDeletable(fs.getFileStatus(file)));
|
||||
|
||||
List<String> files = new ArrayList<String>(1);
|
||||
files.add(file.getName());
|
||||
List<Pair<Path, Path>> files = new ArrayList<>(1);
|
||||
files.add(new Pair<Path, Path>(null, file));
|
||||
// 4. Add the file to hfile-refs queue
|
||||
rq.addHFileRefs(peerId, files);
|
||||
// 5. Assert file should not be deletable
|
||||
|
@ -166,8 +167,8 @@ public class TestReplicationHFileCleaner {
|
|||
f.setPath(notDeletablefile);
|
||||
files.add(f);
|
||||
|
||||
List<String> hfiles = new ArrayList<>(1);
|
||||
hfiles.add(notDeletablefile.getName());
|
||||
List<Pair<Path, Path>> hfiles = new ArrayList<>(1);
|
||||
hfiles.add(new Pair<Path, Path>(null, notDeletablefile));
|
||||
// 2. Add one file to hfile-refs queue
|
||||
rq.addHFileRefs(peerId, hfiles);
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* Source that does nothing at all, helpful to test ReplicationSourceManager
|
||||
|
@ -93,7 +94,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
|
||||
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files)
|
||||
throws ReplicationException {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -25,7 +25,9 @@ import java.util.List;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.Before;
|
||||
|
@ -202,10 +204,10 @@ public abstract class TestReplicationStateBasic {
|
|||
rq1.init(server1);
|
||||
rqc.init();
|
||||
|
||||
List<String> files1 = new ArrayList<String>(3);
|
||||
files1.add("file_1");
|
||||
files1.add("file_2");
|
||||
files1.add("file_3");
|
||||
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
|
||||
files1.add(new Pair<Path, Path>(null, new Path("file_1")));
|
||||
files1.add(new Pair<Path, Path>(null, new Path("file_2")));
|
||||
files1.add(new Pair<Path, Path>(null, new Path("file_3")));
|
||||
assertNull(rqc.getReplicableHFiles(ID_ONE));
|
||||
assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
|
||||
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
|
||||
|
@ -213,13 +215,16 @@ public abstract class TestReplicationStateBasic {
|
|||
rq1.addHFileRefs(ID_ONE, files1);
|
||||
assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
|
||||
assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
|
||||
List<String> files2 = new ArrayList<>(files1);
|
||||
String removedString = files2.remove(0);
|
||||
rq1.removeHFileRefs(ID_ONE, files2);
|
||||
List<String> hfiles2 = new ArrayList<>();
|
||||
for (Pair<Path, Path> p : files1) {
|
||||
hfiles2.add(p.getSecond().getName());
|
||||
}
|
||||
String removedString = hfiles2.remove(0);
|
||||
rq1.removeHFileRefs(ID_ONE, hfiles2);
|
||||
assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
|
||||
files2 = new ArrayList<>(1);
|
||||
files2.add(removedString);
|
||||
rq1.removeHFileRefs(ID_ONE, files2);
|
||||
hfiles2 = new ArrayList<>(1);
|
||||
hfiles2.add(removedString);
|
||||
rq1.removeHFileRefs(ID_ONE, hfiles2);
|
||||
assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
|
||||
rp.unregisterPeer(ID_ONE);
|
||||
}
|
||||
|
@ -235,10 +240,10 @@ public abstract class TestReplicationStateBasic {
|
|||
rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
|
||||
rq1.addPeerToHFileRefs(ID_TWO);
|
||||
|
||||
List<String> files1 = new ArrayList<String>(3);
|
||||
files1.add("file_1");
|
||||
files1.add("file_2");
|
||||
files1.add("file_3");
|
||||
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
|
||||
files1.add(new Pair<Path, Path>(null, new Path("file_1")));
|
||||
files1.add(new Pair<Path, Path>(null, new Path("file_2")));
|
||||
files1.add(new Pair<Path, Path>(null, new Path("file_3")));
|
||||
rq1.addHFileRefs(ID_ONE, files1);
|
||||
rq1.addHFileRefs(ID_TWO, files1);
|
||||
assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
|
||||
|
|
Loading…
Reference in New Issue