HBASE-23378: Clean Up FSUtil setClusterId (#910)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
belugabehr 2020-01-09 15:47:12 -05:00 committed by Michael Stack
parent 12b79a3907
commit f3bdb02280
1 changed files with 47 additions and 35 deletions

View File

@ -36,6 +36,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@ -82,7 +83,6 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -632,54 +632,66 @@ public abstract class FSUtils extends CommonFSUtils {
}
/**
* Writes a new unique identifier for this cluster to the "hbase.id" file
* in the HBase root directory
* Writes a new unique identifier for this cluster to the "hbase.id" file in the HBase root
* directory. If any operations on the ID file fails, and {@code wait} is a positive value, the
* method will retry to produce the ID file until the thread is forcibly interrupted.
*
* @param fs the root directory FileSystem
* @param rootdir the path to the HBase root directory
* @param clusterId the unique identifier to store
* @param wait how long (in milliseconds) to wait between retries
* @throws IOException if writing to the FileSystem fails and no wait value
*/
public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId,
int wait) throws IOException {
public static void setClusterId(final FileSystem fs, final Path rootdir,
final ClusterId clusterId, final long wait) throws IOException {
final Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
final Path tempDir = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY);
final Path tempIdFile = new Path(tempDir, HConstants.CLUSTER_ID_FILE_NAME);
LOG.debug("Create cluster ID file [{}] with ID: {}", idFile, clusterId);
while (true) {
try {
Path idFile = new Path(rootdir, HConstants.CLUSTER_ID_FILE_NAME);
Path tempIdFile = new Path(rootdir, HConstants.HBASE_TEMP_DIRECTORY +
Path.SEPARATOR + HConstants.CLUSTER_ID_FILE_NAME);
// Write the id file to a temporary location
FSDataOutputStream s = fs.create(tempIdFile);
try {
s.write(clusterId.toByteArray());
s.close();
s = null;
// Move the temporary file to its normal location. Throw an IOE if
// the rename failed
if (!fs.rename(tempIdFile, idFile)) {
throw new IOException("Unable to move temp version file to " + idFile);
}
} finally {
// Attempt to close the stream if still open on the way out
try {
if (s != null) s.close();
} catch (IOException ignore) { }
}
if (LOG.isDebugEnabled()) {
LOG.debug("Created cluster ID file at " + idFile.toString() + " with ID: " + clusterId);
}
return;
Optional<IOException> failure = Optional.empty();
LOG.debug("Write the cluster ID file to a temporary location: {}", tempIdFile);
try (FSDataOutputStream s = fs.create(tempIdFile)) {
s.write(clusterId.toByteArray());
} catch (IOException ioe) {
if (wait > 0) {
LOG.warn("Unable to create cluster ID file in " + rootdir.toString() +
", retrying in " + wait + "msec: " + StringUtils.stringifyException(ioe));
failure = Optional.of(ioe);
}
if (!failure.isPresent()) {
try {
LOG.debug("Move the temporary cluster ID file to its target location [{}]:[{}]",
tempIdFile, idFile);
if (!fs.rename(tempIdFile, idFile)) {
failure =
Optional.of(new IOException("Unable to move temp cluster ID file to " + idFile));
}
} catch (IOException ioe) {
failure = Optional.of(ioe);
}
}
if (failure.isPresent()) {
final IOException cause = failure.get();
if (wait > 0L) {
LOG.warn("Unable to create cluster ID file in {}, retrying in {}ms", rootdir, wait,
cause);
try {
Thread.sleep(wait);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
Thread.currentThread().interrupt();
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
continue;
} else {
throw ioe;
throw cause;
}
} else {
return;
}
}
}