diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java index 0ae27d0eada..be5a590f166 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication; import java.util.List; import java.util.SortedSet; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Pair; @@ -144,10 +145,11 @@ public interface ReplicationQueues { /** * Add new hfile references to the queue. * @param peerId peer cluster id to which the hfiles need to be replicated - * @param files list of hfile references to be added + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue } * @throws ReplicationException if fails to add a hfile reference */ - void addHFileRefs(String peerId, List files) throws ReplicationException; + void addHFileRefs(String peerId, List> pairs) throws ReplicationException; /** * Remove hfile references from the queue. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 7c548d9fbc4..1de1315534e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -319,16 +320,18 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R } @Override - public void addHFileRefs(String peerId, List files) throws ReplicationException { + public void addHFileRefs(String peerId, List> pairs) + throws ReplicationException { String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); boolean debugEnabled = LOG.isDebugEnabled(); if (debugEnabled) { - LOG.debug("Adding hfile references " + files + " in queue " + peerZnode); + LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode); } List listOfOps = new ArrayList(); - int size = files.size(); + int size = pairs.size(); for (int i = 0; i < size; i++) { - listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)), + listOfOps.add(ZKUtilOp.createAndFailSilent( + ZKUtil.joinZNode(peerZnode, pairs.get(i).getSecond().getName()), HConstants.EMPTY_BYTE_ARRAY)); } if (debugEnabled) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java index 28b9bdf107f..1023e0d6455 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -307,7 +308,8 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase } @Override - public void addHFileRefs(String peerId, List files) throws ReplicationException { + public void addHFileRefs(String peerId, List> pairs) + throws ReplicationException { // TODO throw new NotImplementedException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 853d699342a..3c9d54f5676 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFa import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.UserProvider; @@ -524,6 +525,9 @@ public class HRegionServer extends HasThread implements checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf); FSUtils.setupShortCircuitRead(this.conf); + + Replication.decorateRegionServerConfiguration(this.conf); + // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java index 256c24caa60..35aa1fbb32f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java @@ -380,7 +380,7 @@ public class HFileReplicator { } catch (FileNotFoundException e1) { // This will mean that the hfile does not exists any where in source cluster FS. So we // cannot do anything here just log and continue. - LOG.error("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath + ". Hence ignoring this hfile from replication..", e1); continue; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 5f876905a5f..d3f9ba2cdb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.NavigableMap; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -32,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -44,8 +44,7 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -236,34 +235,6 @@ public class Replication extends WALActionsListener.Base implements scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager()); } - @Override - public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey, - final WALEdit edit) throws IOException { - NavigableMap scopes = logKey.getReplicationScopes(); - if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) { - TableName tableName = logKey.getTablename(); - for (Cell c : edit.getCells()) { - // Only check for bulk load events - if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) { - BulkLoadDescriptor bld = null; - try { - bld = WALEdit.getBulkLoadDescriptor(c); - } catch (IOException e) { - LOG.error("Failed to get bulk load events information from the wal file.", e); - throw e; - } - - for (StoreDescriptor s : bld.getStoresList()) { - byte[] fam = s.getFamilyName().toByteArray(); - if (scopes.containsKey(fam)) { - addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s); - } - } - } - } - } - } - /** * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from * compaction WAL edits and if the scope is local. @@ -298,10 +269,10 @@ public class Replication extends WALActionsListener.Base implements } } - private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager, - TableName tableName, byte[] family, StoreDescriptor s) throws IOException { + void addHFileRefsToQueue(TableName tableName, byte[] family, List> pairs) + throws IOException { try { - replicationManager.addHFileRefs(tableName, family, s.getStoreFileList()); + this.replicationManager.addHFileRefs(tableName, family, pairs); } catch (ReplicationException e) { LOG.error("Failed to add hfile references in the replication queue.", e); throw new IOException(e); @@ -337,6 +308,22 @@ public class Replication extends WALActionsListener.Base implements } } + /** + * This method modifies the region server's configuration in order to inject replication-related + * features + * @param conf region server configurations + */ + public static void decorateRegionServerConfiguration(Configuration conf) { + if (isReplicationForBulkLoadDataEnabled(conf)) { + String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, ""); + String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName(); + if (!plugins.contains(rsCoprocessorClass)) { + conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + plugins + "," + rsCoprocessorClass); + } + } + } + /* * Statistics thread. Periodically prints the cache statistics to the log. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java new file mode 100644 index 00000000000..03046b435ef --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Pair; + +/** + * An Observer to facilitate replication operations + */ + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class ReplicationObserver extends BaseRegionObserver { + private static final Log LOG = LogFactory.getLog(ReplicationObserver.class); + + @Override + public void preCommitStoreFile(final ObserverContext ctx, + final byte[] family, final List> pairs) throws IOException { + RegionCoprocessorEnvironment env = ctx.getEnvironment(); + Configuration c = env.getConfiguration(); + if (pairs == null || pairs.isEmpty() + || !c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, + HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) { + LOG.debug("Skipping recording bulk load entries in preCommitStoreFile for bulkloaded " + + "data replication."); + return; + } + HRegionServer rs = (HRegionServer) env.getRegionServerServices(); + Replication rep = (Replication) rs.getReplicationSourceService(); + rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3eeb4b8227c..7a229eb2aeb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; @@ -253,7 +254,7 @@ public class ReplicationSource extends Thread } @Override - public void addHFileRefs(TableName tableName, byte[] family, List files) + public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException { String peerId = peerClusterZnode; if (peerId.contains("-")) { @@ -266,8 +267,8 @@ public class ReplicationSource extends Thread List tableCfs = tableCFMap.get(tableName); if (tableCFMap.containsKey(tableName) && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { - this.replicationQueues.addHFileRefs(peerId, files); - metrics.incrSizeOfHFileRefsQueue(files.size()); + this.replicationQueues.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); } else { LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family " + Bytes.toString(family) + " to peer id " + peerId); @@ -275,8 +276,8 @@ public class ReplicationSource extends Thread } else { // user has explicitly not defined any table cfs for replication, means replicate all the // data - this.replicationQueues.addHFileRefs(peerId, files); - metrics.incrSizeOfHFileRefsQueue(files.size()); + this.replicationQueues.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 7f4a9f7fcf8..8d5451c6144 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.util.Pair; /** * Interface that defines a replication source @@ -112,10 +113,11 @@ public interface ReplicationSourceInterface { * Add hfile names to the queue to be replicated. * @param tableName Name of the table these files belongs to * @param family Name of the family these files belong to - * @param files files whose names needs to be added to the queue to be replicated + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue for replication} * @throws ReplicationException If failed to add hfile references */ - void addHFileRefs(TableName tableName, byte[] family, List files) + void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ef4093e0c82..5b574da254e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -846,10 +846,10 @@ public class ReplicationSourceManager implements ReplicationListener { return stats.toString(); } - public void addHFileRefs(TableName tableName, byte[] family, List files) + public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws ReplicationException { for (ReplicationSourceInterface source : this.sources) { - source.addHFileRefs(tableName, family, files); + source.addHFileRefs(tableName, family, pairs); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index fc3e516d07a..817cfb49b41 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -138,8 +139,8 @@ public class TestReplicationHFileCleaner { + "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file))); - List files = new ArrayList(1); - files.add(file.getName()); + List> files = new ArrayList<>(1); + files.add(new Pair(null, file)); // 4. Add the file to hfile-refs queue rq.addHFileRefs(peerId, files); // 5. Assert file should not be deletable @@ -166,8 +167,8 @@ public class TestReplicationHFileCleaner { f.setPath(notDeletablefile); files.add(f); - List hfiles = new ArrayList<>(1); - hfiles.add(notDeletablefile.getName()); + List> hfiles = new ArrayList<>(1); + hfiles.add(new Pair(null, notDeletablefile)); // 2. Add one file to hfile-refs queue rq.addHFileRefs(peerId, hfiles); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index abe484eac79..57e54d77cc5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; +import org.apache.hadoop.hbase.util.Pair; /** * Source that does nothing at all, helpful to test ReplicationSourceManager @@ -93,7 +94,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public void addHFileRefs(TableName tableName, byte[] family, List files) + public void addHFileRefs(TableName tableName, byte[] family, List> files) throws ReplicationException { return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index fcab105ac2f..f8be9a7bf59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -25,7 +25,9 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.zookeeper.KeeperException; import org.junit.Before; @@ -202,10 +204,10 @@ public abstract class TestReplicationStateBasic { rq1.init(server1); rqc.init(); - List files1 = new ArrayList(3); - files1.add("file_1"); - files1.add("file_2"); - files1.add("file_3"); + List> files1 = new ArrayList<>(3); + files1.add(new Pair(null, new Path("file_1"))); + files1.add(new Pair(null, new Path("file_2"))); + files1.add(new Pair(null, new Path("file_3"))); assertNull(rqc.getReplicableHFiles(ID_ONE)); assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size()); rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); @@ -213,13 +215,16 @@ public abstract class TestReplicationStateBasic { rq1.addHFileRefs(ID_ONE, files1); assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size()); assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size()); - List files2 = new ArrayList<>(files1); - String removedString = files2.remove(0); - rq1.removeHFileRefs(ID_ONE, files2); + List hfiles2 = new ArrayList<>(); + for (Pair p : files1) { + hfiles2.add(p.getSecond().getName()); + } + String removedString = hfiles2.remove(0); + rq1.removeHFileRefs(ID_ONE, hfiles2); assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size()); - files2 = new ArrayList<>(1); - files2.add(removedString); - rq1.removeHFileRefs(ID_ONE, files2); + hfiles2 = new ArrayList<>(1); + hfiles2.add(removedString); + rq1.removeHFileRefs(ID_ONE, hfiles2); assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size()); rp.unregisterPeer(ID_ONE); } @@ -235,10 +240,10 @@ public abstract class TestReplicationStateBasic { rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); rq1.addPeerToHFileRefs(ID_TWO); - List files1 = new ArrayList(3); - files1.add("file_1"); - files1.add("file_2"); - files1.add("file_3"); + List> files1 = new ArrayList<>(3); + files1.add(new Pair(null, new Path("file_1"))); + files1.add(new Pair(null, new Path("file_2"))); + files1.add(new Pair(null, new Path("file_3"))); rq1.addHFileRefs(ID_ONE, files1); rq1.addHFileRefs(ID_TWO, files1); assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());