HBASE-927 We don't recover if HRS hosting -ROOT-/.META. goes down - (back port from trunk)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/branches/0.18@723175 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
38b5c1f5c1
commit
cfb9cb2946
|
@ -4,6 +4,9 @@ Release 0.18.2 - Unreleased
|
|||
|
||||
BUG FIXES
|
||||
HBASE-602 HBase Crash when network card has a IPv6 address
|
||||
HBASE-927 We don't recover if HRS hosting -ROOT-/.META. goes down -
|
||||
(back port from trunk)
|
||||
|
||||
|
||||
Release 0.18.1 - Released October 27, 2008
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.io.WritableComparable;
|
|||
/**
|
||||
* This class encapsulates metrics for determining the load on a HRegionServer
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public class HServerLoad implements WritableComparable {
|
||||
private int numberOfRequests; // number of requests since last report
|
||||
private int numberOfRegions; // number of regions being served
|
||||
|
|
|
@ -22,9 +22,7 @@ package org.apache.hadoop.hbase.master;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.fs.FileSystem; //TODO: remove
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile; //TODO: remove
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
|
||||
|
|
|
@ -121,7 +121,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
final int metaRescanInterval;
|
||||
|
||||
// A Sleeper that sleeps for threadWakeFrequency
|
||||
protected final Sleeper sleeper;
|
||||
private final Sleeper sleeper;
|
||||
|
||||
// Default access so accesible from unit tests. MASTER is name of the webapp
|
||||
// and the attribute name used stuffing this instance into web context.
|
||||
|
@ -587,10 +587,10 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
try {
|
||||
// We can not access meta regions if they have not already been
|
||||
// assigned and scanned. If we timeout waiting, just shutdown.
|
||||
if (regionManager.waitForMetaRegionsOrClose()) {
|
||||
break;
|
||||
// We can not create a table unless meta regions have already been
|
||||
// assigned and scanned.
|
||||
if (!regionManager.areAllMetaRegionsOnline()) {
|
||||
throw new NotAllMetaRegionsOnlineException();
|
||||
}
|
||||
createTable(newRegion);
|
||||
LOG.info("created table " + desc.getNameAsString());
|
||||
|
|
|
@ -62,7 +62,7 @@ class MetaScanner extends BaseScanner {
|
|||
boolean scanSuccessful = false;
|
||||
while (!master.closed.get() && !regionManager.isInitialRootScanComplete() &&
|
||||
regionManager.getRootRegionLocation() == null) {
|
||||
master.sleeper.sleep();
|
||||
sleep();
|
||||
}
|
||||
if (master.closed.get()) {
|
||||
return scanSuccessful;
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.util.Map; //TODO: remove
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
|
||||
/**
|
||||
* Thrown when an operation requires the root and all meta regions to be online
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class NotAllMetaRegionsOnlineException extends DoNotRetryIOException {
|
||||
/**
|
||||
* default constructor
|
||||
*/
|
||||
public NotAllMetaRegionsOnlineException() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param message
|
||||
*/
|
||||
public NotAllMetaRegionsOnlineException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
}
|
|
@ -96,13 +96,8 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
|
|||
regionInfo.getRegionName(), regionInfo.getStartKey());
|
||||
if (!master.regionManager.isInitialMetaScanComplete()) {
|
||||
// Put it on the queue to be scanned for the first time.
|
||||
try {
|
||||
LOG.debug("Adding " + m.toString() + " to regions to scan");
|
||||
master.regionManager.addMetaRegionToScan(m);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(
|
||||
"Putting into metaRegionsToScan was interrupted.", e);
|
||||
}
|
||||
LOG.debug("Adding " + m.toString() + " to regions to scan");
|
||||
master.regionManager.addMetaRegionToScan(m);
|
||||
} else {
|
||||
// Add it to the online meta regions
|
||||
LOG.debug("Adding to onlineMetaRegions: " + m.toString());
|
||||
|
|
|
@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.io.RowResult;
|
|||
class ProcessServerShutdown extends RegionServerOperation {
|
||||
private HServerAddress deadServer;
|
||||
private String deadServerName;
|
||||
private final boolean rootRegionServer;
|
||||
private boolean rootRegionReassigned = false;
|
||||
private Path oldLogDir;
|
||||
private boolean logSplit;
|
||||
private boolean rootRescanned;
|
||||
|
@ -64,11 +66,14 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
/**
|
||||
* @param master
|
||||
* @param serverInfo
|
||||
* @param rootRegionServer
|
||||
*/
|
||||
public ProcessServerShutdown(HMaster master, HServerInfo serverInfo) {
|
||||
public ProcessServerShutdown(HMaster master, HServerInfo serverInfo,
|
||||
boolean rootRegionServer) {
|
||||
super(master);
|
||||
this.deadServer = serverInfo.getServerAddress();
|
||||
this.deadServerName = this.deadServer.toString();
|
||||
this.rootRegionServer = rootRegionServer;
|
||||
this.logSplit = false;
|
||||
this.rootRescanned = false;
|
||||
this.oldLogDir =
|
||||
|
@ -244,6 +249,17 @@ class ProcessServerShutdown extends RegionServerOperation {
|
|||
logSplit = true;
|
||||
}
|
||||
|
||||
if (this.rootRegionServer && !this.rootRegionReassigned) {
|
||||
// The server that died was serving the root region. Now that the log
|
||||
// has been split, get it reassigned.
|
||||
master.regionManager.reassignRootRegion();
|
||||
// avoid multiple root region reassignment
|
||||
this.rootRegionReassigned = true;
|
||||
// When we call rootAvailable below, it will put us on the delayed
|
||||
// to do queue to allow some time to pass during which the root
|
||||
// region will hopefully get reassigned.
|
||||
}
|
||||
|
||||
if (!rootAvailable()) {
|
||||
// Return true so that worker does not put this request back on the
|
||||
// toDoQueue.
|
||||
|
|
|
@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.util.Writables;
|
|||
class RegionManager implements HConstants {
|
||||
protected static final Log LOG = LogFactory.getLog(RegionManager.class);
|
||||
|
||||
private volatile AtomicReference<HServerAddress> rootRegionLocation =
|
||||
private AtomicReference<HServerAddress> rootRegionLocation =
|
||||
new AtomicReference<HServerAddress>(null);
|
||||
|
||||
final Lock splitLogLock = new ReentrantLock();
|
||||
|
@ -160,6 +160,17 @@ class RegionManager implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
void unsetRootRegion() {
|
||||
rootRegionLocation.set(null);
|
||||
}
|
||||
|
||||
void reassignRootRegion() {
|
||||
unsetRootRegion();
|
||||
if (!master.shutdownRequested) {
|
||||
unassignedRegions.put(HRegionInfo.ROOT_REGIONINFO, ZERO_L);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Assigns regions to region servers attempting to balance the load across
|
||||
* all region servers
|
||||
|
@ -493,10 +504,16 @@ class RegionManager implements HConstants {
|
|||
* Block until meta regions are online or we're shutting down.
|
||||
* @return true if we found meta regions, false if we're closing.
|
||||
*/
|
||||
public boolean waitForMetaRegionsOrClose() {
|
||||
return metaScannerThread.waitForMetaRegionsOrClose();
|
||||
public boolean areAllMetaRegionsOnline() {
|
||||
boolean result = false;
|
||||
if (rootRegionLocation.get() != null &&
|
||||
numberOfMetaRegions.get() == onlineMetaRegions.size()) {
|
||||
result = true;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Search our map of online meta regions to find the first meta region that
|
||||
* should contain a pointer to <i>newRegion</i>.
|
||||
|
@ -523,16 +540,25 @@ class RegionManager implements HConstants {
|
|||
* Get a set of all the meta regions that contain info about a given table.
|
||||
* @param tableName Table you need to know all the meta regions for
|
||||
* @return set of MetaRegion objects that contain the table
|
||||
* @throws NotAllMetaRegionsOnlineException
|
||||
*/
|
||||
public Set<MetaRegion> getMetaRegionsForTable(byte [] tableName) {
|
||||
public Set<MetaRegion> getMetaRegionsForTable(byte [] tableName)
|
||||
throws NotAllMetaRegionsOnlineException {
|
||||
byte [] firstMetaRegion = null;
|
||||
Set<MetaRegion> metaRegions = new HashSet<MetaRegion>();
|
||||
|
||||
if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
|
||||
if (rootRegionLocation.get() == null) {
|
||||
throw new NotAllMetaRegionsOnlineException(
|
||||
Bytes.toString(HConstants.ROOT_TABLE_NAME));
|
||||
}
|
||||
metaRegions.add(new MetaRegion(rootRegionLocation.get(),
|
||||
HRegionInfo.ROOT_REGIONINFO.getRegionName()));
|
||||
|
||||
} else {
|
||||
if (!areAllMetaRegionsOnline()) {
|
||||
throw new NotAllMetaRegionsOnlineException();
|
||||
}
|
||||
synchronized (onlineMetaRegions) {
|
||||
if (onlineMetaRegions.size() == 1) {
|
||||
firstMetaRegion = onlineMetaRegions.firstKey();
|
||||
|
@ -592,9 +618,9 @@ class RegionManager implements HConstants {
|
|||
* @return list of MetaRegion objects
|
||||
*/
|
||||
public List<MetaRegion> getListOfOnlineMetaRegions() {
|
||||
List<MetaRegion> regions = new ArrayList<MetaRegion>();
|
||||
List<MetaRegion> regions = null;
|
||||
synchronized(onlineMetaRegions) {
|
||||
regions.addAll(onlineMetaRegions.values());
|
||||
regions = new ArrayList<MetaRegion>(onlineMetaRegions.values());
|
||||
}
|
||||
return regions;
|
||||
}
|
||||
|
@ -805,9 +831,8 @@ class RegionManager implements HConstants {
|
|||
/**
|
||||
* Add a meta region to the scan queue
|
||||
* @param m MetaRegion that needs to get scanned
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void addMetaRegionToScan(MetaRegion m) throws InterruptedException {
|
||||
public void addMetaRegionToScan(MetaRegion m) {
|
||||
metaScannerThread.addMetaRegionToScan(m);
|
||||
}
|
||||
|
||||
|
|
|
@ -69,12 +69,6 @@ abstract class RegionServerOperation implements Delayed, HConstants {
|
|||
|
||||
protected boolean metaTableAvailable() {
|
||||
boolean available = true;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("numberOfMetaRegions: " +
|
||||
master.regionManager.numMetaRegions() +
|
||||
", onlineMetaRegions.size(): " +
|
||||
master.regionManager.numOnlineMetaRegions());
|
||||
}
|
||||
if (master.regionManager.numMetaRegions() !=
|
||||
master.regionManager.numOnlineMetaRegions()) {
|
||||
// We can't proceed because not all of the meta regions are online.
|
||||
|
@ -83,6 +77,10 @@ abstract class RegionServerOperation implements Delayed, HConstants {
|
|||
// in the run queue, put this request on the delay queue to give
|
||||
// other threads the opportunity to get the meta regions on-line.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("numberOfMetaRegions: " +
|
||||
master.regionManager.numMetaRegions() +
|
||||
", onlineMetaRegions.size(): " +
|
||||
master.regionManager.numOnlineMetaRegions());
|
||||
LOG.debug("Requeuing because not all meta regions are online");
|
||||
}
|
||||
available = false;
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
|||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.util.Sleeper;
|
||||
|
||||
/**
|
||||
* Uses Callable pattern so that operations against meta regions do not need
|
||||
|
@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
|||
*/
|
||||
abstract class RetryableMetaOperation<T> implements Callable<T> {
|
||||
protected final Log LOG = LogFactory.getLog(this.getClass());
|
||||
protected final Sleeper sleeper;
|
||||
protected final MetaRegion m;
|
||||
protected final HMaster master;
|
||||
|
||||
|
@ -47,6 +49,7 @@ abstract class RetryableMetaOperation<T> implements Callable<T> {
|
|||
protected RetryableMetaOperation(MetaRegion m, HMaster master) {
|
||||
this.m = m;
|
||||
this.master = master;
|
||||
this.sleeper = new Sleeper(master.threadWakeFrequency, master.closed);
|
||||
}
|
||||
|
||||
protected T doWithRetries()
|
||||
|
@ -89,7 +92,7 @@ abstract class RetryableMetaOperation<T> implements Callable<T> {
|
|||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
master.sleeper.sleep();
|
||||
sleeper.sleep();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -124,10 +124,17 @@ class ServerManager implements HConstants {
|
|||
// The startup message was from a known server with the same name.
|
||||
// Timeout the old one right away.
|
||||
HServerAddress root = master.getRootRegionLocation();
|
||||
boolean rootServer = false;
|
||||
if (root != null && root.equals(storedInfo.getServerAddress())) {
|
||||
master.regionManager.unassignRootRegion();
|
||||
master.regionManager.unsetRootRegion();
|
||||
rootServer = true;
|
||||
}
|
||||
try {
|
||||
master.toDoQueue.put(
|
||||
new ProcessServerShutdown(master, storedInfo, rootServer));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Insertion into toDoQueue was interrupted", e);
|
||||
}
|
||||
master.delayedToDoQueue.put(new ProcessServerShutdown(master, storedInfo));
|
||||
}
|
||||
|
||||
// record new server
|
||||
|
@ -285,16 +292,18 @@ class ServerManager implements HConstants {
|
|||
serversToServerInfo.put(serverName, serverInfo);
|
||||
|
||||
HServerLoad load = serversToLoad.get(serverName);
|
||||
if (load != null && !load.equals(serverInfo.getLoad())) {
|
||||
// We have previous information about the load on this server
|
||||
// and the load on this server has changed
|
||||
synchronized (loadToServers) {
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
if (load != null) {
|
||||
if (!load.equals(serverInfo.getLoad())) {
|
||||
// We have previous information about the load on this server
|
||||
// and the load on this server has changed
|
||||
synchronized (loadToServers) {
|
||||
Set<String> servers = loadToServers.get(load);
|
||||
|
||||
// Note that servers should never be null because loadToServers
|
||||
// and serversToLoad are manipulated in pairs
|
||||
servers.remove(serverName);
|
||||
loadToServers.put(load, servers);
|
||||
// Note that servers should never be null because loadToServers
|
||||
// and serversToLoad are manipulated in pairs
|
||||
servers.remove(serverName);
|
||||
loadToServers.put(load, servers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -668,10 +677,12 @@ class ServerManager implements HConstants {
|
|||
LOG.info(server + " lease expired");
|
||||
// Remove the server from the known servers list and update load info
|
||||
HServerInfo info = serversToServerInfo.remove(server);
|
||||
boolean rootServer = false;
|
||||
if (info != null) {
|
||||
HServerAddress root = master.getRootRegionLocation();
|
||||
if (root != null && root.equals(info.getServerAddress())) {
|
||||
master.regionManager.unassignRootRegion();
|
||||
master.regionManager.unsetRootRegion();
|
||||
rootServer = true;
|
||||
}
|
||||
String serverName = info.getServerAddress().toString();
|
||||
HServerLoad load = serversToLoad.remove(serverName);
|
||||
|
@ -685,17 +696,16 @@ class ServerManager implements HConstants {
|
|||
}
|
||||
}
|
||||
deadServers.add(server);
|
||||
try {
|
||||
master.toDoQueue.put(
|
||||
new ProcessServerShutdown(master, info, rootServer));
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("insert into toDoQueue was interrupted", e);
|
||||
}
|
||||
}
|
||||
synchronized (serversToServerInfo) {
|
||||
serversToServerInfo.notifyAll();
|
||||
}
|
||||
|
||||
// NOTE: If the server was serving the root region, we cannot reassign it
|
||||
// here because the new server will start serving the root region before
|
||||
// the ProcessServerShutdown operation has a chance to split the log file.
|
||||
if (info != null) {
|
||||
master.delayedToDoQueue.put(new ProcessServerShutdown(master, info));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -622,85 +622,90 @@ public class HLog implements HConstants {
|
|||
LOG.debug("Splitting " + i + " of " + logfiles.length + ": " +
|
||||
logfiles[i].getPath());
|
||||
}
|
||||
// Check for empty file.
|
||||
if (logfiles[i].getLen() <= 0) {
|
||||
LOG.info("Skipping " + logfiles[i].toString() +
|
||||
" because zero length");
|
||||
continue;
|
||||
}
|
||||
// Check for possibly empty file. With appends, currently Hadoop reports
|
||||
// a zero length even if the file has been sync'd. Revisit if
|
||||
// HADOOP-4751 is committed.
|
||||
boolean possiblyEmpty = logfiles[i].getLen() <= 0;
|
||||
HLogKey key = new HLogKey();
|
||||
HLogEdit val = new HLogEdit();
|
||||
SequenceFile.Reader in =
|
||||
new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
|
||||
try {
|
||||
int count = 0;
|
||||
for (; in.next(key, val); count++) {
|
||||
byte [] tableName = key.getTablename();
|
||||
byte [] regionName = key.getRegionName();
|
||||
SequenceFile.Writer w = logWriters.get(regionName);
|
||||
if (w == null) {
|
||||
Path logfile = new Path(
|
||||
HRegion.getRegionDir(
|
||||
HTableDescriptor.getTableDir(rootDir, tableName),
|
||||
HRegionInfo.encodeRegionName(regionName)),
|
||||
HREGION_OLDLOGFILE_NAME);
|
||||
Path oldlogfile = null;
|
||||
SequenceFile.Reader old = null;
|
||||
if (fs.exists(logfile)) {
|
||||
LOG.warn("Old log file " + logfile +
|
||||
" already exists. Copying existing file to new file");
|
||||
oldlogfile = new Path(logfile.toString() + ".old");
|
||||
fs.rename(logfile, oldlogfile);
|
||||
old = new SequenceFile.Reader(fs, oldlogfile, conf);
|
||||
}
|
||||
w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
|
||||
HLogEdit.class, getCompressionType(conf));
|
||||
// Use copy of regionName; regionName object is reused inside in
|
||||
// HStoreKey.getRegionName so its content changes as we iterate.
|
||||
logWriters.put(regionName, w);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating new log file writer for path " + logfile +
|
||||
" and region " + regionName);
|
||||
}
|
||||
|
||||
if (old != null) {
|
||||
// Copy from existing log file
|
||||
HLogKey oldkey = new HLogKey();
|
||||
HLogEdit oldval = new HLogEdit();
|
||||
for (; old.next(oldkey, oldval); count++) {
|
||||
if (LOG.isDebugEnabled() && count > 0 && count % 10000 == 0) {
|
||||
LOG.debug("Copied " + count + " edits");
|
||||
}
|
||||
w.append(oldkey, oldval);
|
||||
SequenceFile.Reader in =
|
||||
new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
|
||||
try {
|
||||
int count = 0;
|
||||
for (; in.next(key, val); count++) {
|
||||
byte [] tableName = key.getTablename();
|
||||
byte [] regionName = key.getRegionName();
|
||||
SequenceFile.Writer w = logWriters.get(regionName);
|
||||
if (w == null) {
|
||||
Path logfile = new Path(
|
||||
HRegion.getRegionDir(
|
||||
HTableDescriptor.getTableDir(rootDir, tableName),
|
||||
HRegionInfo.encodeRegionName(regionName)),
|
||||
HREGION_OLDLOGFILE_NAME);
|
||||
Path oldlogfile = null;
|
||||
SequenceFile.Reader old = null;
|
||||
if (fs.exists(logfile)) {
|
||||
LOG.warn("Old log file " + logfile +
|
||||
" already exists. Copying existing file to new file");
|
||||
oldlogfile = new Path(logfile.toString() + ".old");
|
||||
fs.rename(logfile, oldlogfile);
|
||||
old = new SequenceFile.Reader(fs, oldlogfile, conf);
|
||||
}
|
||||
w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
|
||||
HLogEdit.class, getCompressionType(conf));
|
||||
// Use copy of regionName; regionName object is reused inside in
|
||||
// HStoreKey.getRegionName so its content changes as we iterate.
|
||||
logWriters.put(regionName, w);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating new log file writer for path " + logfile +
|
||||
" and region " + regionName);
|
||||
}
|
||||
|
||||
if (old != null) {
|
||||
// Copy from existing log file
|
||||
HLogKey oldkey = new HLogKey();
|
||||
HLogEdit oldval = new HLogEdit();
|
||||
for (; old.next(oldkey, oldval); count++) {
|
||||
if (LOG.isDebugEnabled() && count > 0 && count % 10000 == 0) {
|
||||
LOG.debug("Copied " + count + " edits");
|
||||
}
|
||||
w.append(oldkey, oldval);
|
||||
}
|
||||
old.close();
|
||||
fs.delete(oldlogfile, true);
|
||||
}
|
||||
old.close();
|
||||
fs.delete(oldlogfile, true);
|
||||
}
|
||||
w.append(key, val);
|
||||
}
|
||||
w.append(key, val);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Applied " + count + " total edits from " +
|
||||
logfiles[i].getPath().toString());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Applied " + count + " total edits from " +
|
||||
logfiles[i].getPath().toString());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
if (!(e instanceof EOFException)) {
|
||||
LOG.warn("Exception processing " + logfiles[i].getPath() +
|
||||
" -- continuing. Possible DATA LOSS!", e);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
in.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Close in finally threw exception -- continuing", e);
|
||||
}
|
||||
// Delete the input file now so we do not replay edits. We could
|
||||
// have gotten here because of an exception. If so, probably
|
||||
// nothing we can do about it. Replaying it, it could work but we
|
||||
// could be stuck replaying for ever. Just continue though we
|
||||
// could have lost some edits.
|
||||
fs.delete(logfiles[i].getPath(), true);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
if (!(e instanceof EOFException)) {
|
||||
LOG.warn("Exception processing " + logfiles[i].getPath() +
|
||||
" -- continuing. Possible DATA LOSS!", e);
|
||||
if (possiblyEmpty) {
|
||||
continue;
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
in.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Close in finally threw exception -- continuing", e);
|
||||
}
|
||||
// Delete the input file now so we do not replay edits. We could
|
||||
// have gotten here because of an exception. If so, probably
|
||||
// nothing we can do about it. Replaying it, it could work but we
|
||||
// could be stuck replaying for ever. Just continue though we
|
||||
// could have lost some edits.
|
||||
fs.delete(logfiles[i].getPath(), true);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue