HBASE-20497 The getRecoveredQueueStartPos always return 0 in RecoveredReplicationSourceShipper
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
59f6ecd6b2
commit
a136303833
|
@ -60,7 +60,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
|
|||
}
|
||||
|
||||
@Override
|
||||
long getStartPosition() {
|
||||
public long getStartPosition() {
|
||||
long startPosition = getRecoveredQueueStartPos();
|
||||
int numRetries = 0;
|
||||
while (numRetries <= maxRetriesMultiplier) {
|
||||
|
@ -79,32 +79,30 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
|
|||
// normally has a position (unless the RS failed between 2 logs)
|
||||
private long getRecoveredQueueStartPos() {
|
||||
long startPosition = 0;
|
||||
String peerClusterZnode = source.getQueueId();
|
||||
String peerClusterZNode = source.getQueueId();
|
||||
try {
|
||||
startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
|
||||
peerClusterZnode, this.queue.peek().getName());
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " +
|
||||
startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
|
||||
peerClusterZNode, this.queue.peek().getName());
|
||||
LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
|
||||
startPosition);
|
||||
}
|
||||
} catch (ReplicationException e) {
|
||||
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
|
||||
terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
|
||||
}
|
||||
return startPosition;
|
||||
}
|
||||
|
||||
private void terminate(String reason, Exception cause) {
|
||||
if (cause == null) {
|
||||
LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
|
||||
|
||||
LOG.info("Closing worker for wal group {} because: {}", this.walGroupId, reason);
|
||||
} else {
|
||||
LOG.error("Closing worker for wal group " + this.walGroupId
|
||||
+ " because an error occurred: " + reason, cause);
|
||||
LOG.error(
|
||||
"Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason,
|
||||
cause);
|
||||
}
|
||||
entryReader.interrupt();
|
||||
Threads.shutdown(entryReader, sleepForRetries);
|
||||
this.interrupt();
|
||||
Threads.shutdown(this, sleepForRetries);
|
||||
LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
|
||||
LOG.info("ReplicationSourceWorker {} terminated", this.getName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
@ -26,6 +26,7 @@ import java.util.OptionalLong;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -36,12 +37,20 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
|
||||
|
@ -54,6 +63,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
|
|||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
@ -80,9 +90,6 @@ public class TestReplicationSource {
|
|||
private static Path logDir;
|
||||
private static Configuration conf = TEST_UTIL.getConfiguration();
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniDFSCluster(1);
|
||||
|
@ -105,7 +112,6 @@ public class TestReplicationSource {
|
|||
* Sanity check that we can move logs around while we are reading
|
||||
* from them. Should this test fail, ReplicationSource would have a hard
|
||||
* time reading logs that are being archived.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testLogMoving() throws Exception{
|
||||
|
@ -185,15 +191,12 @@ public class TestReplicationSource {
|
|||
public boolean evaluate() throws Exception {
|
||||
return future.isDone();
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that recovered queues are preserved on a regionserver shutdown.
|
||||
* See HBASE-18192
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testServerShutdownRecoveredQueue() throws Exception {
|
||||
|
@ -216,7 +219,7 @@ public class TestReplicationSource {
|
|||
|
||||
final String peerId = "TestPeer";
|
||||
admin.addReplicationPeer(peerId,
|
||||
new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()));
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
|
||||
// Wait for replication sources to come up
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() throws Exception {
|
||||
|
@ -292,5 +295,29 @@ public class TestReplicationSource {
|
|||
}
|
||||
}
|
||||
|
||||
// Test HBASE-20497
|
||||
@Test
|
||||
public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
|
||||
String walGroupId = "fake-wal-group-id";
|
||||
ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
|
||||
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
|
||||
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
|
||||
queue.put(new Path("/www/html/test"));
|
||||
RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
|
||||
Server server = Mockito.mock(Server.class);
|
||||
Mockito.when(server.getServerName()).thenReturn(serverName);
|
||||
Mockito.when(source.getServer()).thenReturn(server);
|
||||
Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
|
||||
ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
|
||||
Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
|
||||
.thenReturn(1001L);
|
||||
Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
|
||||
.thenReturn(-1L);
|
||||
conf.setInt("replication.source.maxretriesmultiplier", -1);
|
||||
RecoveredReplicationSourceShipper shipper =
|
||||
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
|
||||
Assert.assertEquals(1001L, shipper.getStartPosition());
|
||||
conf.unset("replication.source.maxretriesmultiplier");
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue