HBASE-20497 The getRecoveredQueueStartPos always return 0 in RecoveredReplicationSourceShipper

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
huzheng 2018-04-28 11:14:43 +08:00 committed by zhangduo
parent 72093178fb
commit d38a104c4d
2 changed files with 48 additions and 23 deletions

View File

@ -60,7 +60,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
}
@Override
long getStartPosition() {
public long getStartPosition() {
long startPosition = getRecoveredQueueStartPos();
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {
@ -79,32 +79,30 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
// normally has a position (unless the RS failed between 2 logs)
private long getRecoveredQueueStartPos() {
long startPosition = 0;
String peerClusterZnode = source.getQueueId();
String peerClusterZNode = source.getQueueId();
try {
startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
peerClusterZnode, this.queue.peek().getName());
if (LOG.isTraceEnabled()) {
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " +
startPosition);
}
startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
peerClusterZNode, this.queue.peek().getName());
LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
startPosition);
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
}
return startPosition;
}
private void terminate(String reason, Exception cause) {
if (cause == null) {
LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
LOG.info("Closing worker for wal group {} because: {}", this.walGroupId, reason);
} else {
LOG.error("Closing worker for wal group " + this.walGroupId
+ " because an error occurred: " + reason, cause);
LOG.error(
"Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason,
cause);
}
entryReader.interrupt();
Threads.shutdown(entryReader, sleepForRetries);
this.interrupt();
Threads.shutdown(this, sleepForRetries);
LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
LOG.info("ReplicationSourceWorker {} terminated", this.getName());
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -26,6 +26,7 @@ import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -36,12 +37,20 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@ -54,6 +63,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@ -80,9 +90,6 @@ public class TestReplicationSource {
private static Path logDir;
private static Configuration conf = TEST_UTIL.getConfiguration();
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniDFSCluster(1);
@ -105,7 +112,6 @@ public class TestReplicationSource {
* Sanity check that we can move logs around while we are reading
* from them. Should this test fail, ReplicationSource would have a hard
* time reading logs that are being archived.
* @throws Exception
*/
@Test
public void testLogMoving() throws Exception{
@ -185,15 +191,12 @@ public class TestReplicationSource {
public boolean evaluate() throws Exception {
return future.isDone();
}
});
}
/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
* @throws Exception
*/
@Test
public void testServerShutdownRecoveredQueue() throws Exception {
@ -216,7 +219,7 @@ public class TestReplicationSource {
final String peerId = "TestPeer";
admin.addReplicationPeer(peerId,
new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()));
ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
// Wait for replication sources to come up
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
@ -292,5 +295,29 @@ public class TestReplicationSource {
}
}
// Test HBASE-20497
@Test
public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
String walGroupId = "fake-wal-group-id";
ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
queue.put(new Path("/www/html/test"));
RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
Server server = Mockito.mock(Server.class);
Mockito.when(server.getServerName()).thenReturn(serverName);
Mockito.when(source.getServer()).thenReturn(server);
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
.thenReturn(1001L);
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
.thenReturn(-1L);
conf.setInt("replication.source.maxretriesmultiplier", -1);
RecoveredReplicationSourceShipper shipper =
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
Assert.assertEquals(1001L, shipper.getStartPosition());
conf.unset("replication.source.maxretriesmultiplier");
}
}