HBASE-5197 [replication] Handle socket timeouts in ReplicationSource

to prevent DDOS


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1243733 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2012-02-13 23:12:55 +00:00
parent b50714c043
commit ce3bae0334
1 changed files with 18 additions and 3 deletions

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
@ -124,7 +125,9 @@ public class ReplicationSource extends Thread
// List of all the dead region servers that had this queue (if recovered) // List of all the dead region servers that had this queue (if recovered)
private String[] deadRegionServers; private String[] deadRegionServers;
// Maximum number of retries before taking bold actions // Maximum number of retries before taking bold actions
private long maxRetriesMultiplier; private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier;
// Current number of entries that we need to replicate // Current number of entries that we need to replicate
private int currentNbEntries = 0; private int currentNbEntries = 0;
// Current number of operations (Put/Delete) that we need to replicate // Current number of operations (Put/Delete) that we need to replicate
@ -166,7 +169,8 @@ public class ReplicationSource extends Thread
this.entriesArray[i] = new HLog.Entry(); this.entriesArray[i] = new HLog.Entry();
} }
this.maxRetriesMultiplier = this.maxRetriesMultiplier =
this.conf.getLong("replication.source.maxretriesmultiplier", 10); this.conf.getInt("replication.source.maxretriesmultiplier", 10);
this.socketTimeoutMultiplier = maxRetriesMultiplier * maxRetriesMultiplier;
this.queue = this.queue =
new PriorityBlockingQueue<Path>( new PriorityBlockingQueue<Path>(
conf.getInt("hbase.regionserver.maxlogs", 32), conf.getInt("hbase.regionserver.maxlogs", 32),
@ -619,9 +623,20 @@ public class ReplicationSource extends Thread
if (ioe instanceof RemoteException) { if (ioe instanceof RemoteException) {
ioe = ((RemoteException) ioe).unwrapRemoteException(); ioe = ((RemoteException) ioe).unwrapRemoteException();
LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
} else {
if (ioe instanceof SocketTimeoutException) {
// This exception means we waited for more than 60s and nothing
// happened, the cluster is alive and calling it right away
// even for a test just makes things worse.
sleepForRetries("Encountered a SocketTimeoutException. Since the" +
"call to the remote cluster timed out, which is usually " +
"caused by a machine failure or a massive slowdown",
this.socketTimeoutMultiplier);
} else { } else {
LOG.warn("Can't replicate because of a local or network error: ", ioe); LOG.warn("Can't replicate because of a local or network error: ", ioe);
} }
}
try { try {
boolean down; boolean down;
// Spin while the slave is down and we're not asked to shutdown/close // Spin while the slave is down and we're not asked to shutdown/close