HBASE-19990 Create remote wal directory when transitting to state S

This commit is contained in:
zhangduo 2018-02-14 16:01:16 +08:00
parent a41c549ca4
commit 0c97cda2a9
6 changed files with 55 additions and 17 deletions

View File

@ -15,16 +15,21 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.procedure2; package org.apache.hadoop.hbase.procedure2;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
// TODO: Not used yet /**
* Indicate that a procedure wants to be rescheduled. Usually because there are something wrong but
* we do not want to fail the procedure.
* <p>
* TODO: need to support scheduling after a delay.
*/
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Stable @InterfaceStability.Stable
public class ProcedureYieldException extends ProcedureException { public class ProcedureYieldException extends ProcedureException {
/** default constructor */ /** default constructor */
public ProcedureYieldException() { public ProcedureYieldException() {
super(); super();

View File

@ -41,6 +41,8 @@ public final class ReplicationUtils {
public static final String REPLICATION_ATTR_NAME = "__rep__"; public static final String REPLICATION_ATTR_NAME = "__rep__";
public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
private ReplicationUtils() { private ReplicationUtils() {
} }

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -133,7 +134,6 @@ public class MasterFileSystem {
* Idempotent. * Idempotent.
*/ */
private void createInitialFileSystemLayout() throws IOException { private void createInitialFileSystemLayout() throws IOException {
final String[] protectedSubDirs = new String[] { final String[] protectedSubDirs = new String[] {
HConstants.BASE_NAMESPACE_DIR, HConstants.BASE_NAMESPACE_DIR,
HConstants.HFILE_ARCHIVE_DIRECTORY, HConstants.HFILE_ARCHIVE_DIRECTORY,
@ -145,7 +145,8 @@ public class MasterFileSystem {
HConstants.HREGION_LOGDIR_NAME, HConstants.HREGION_LOGDIR_NAME,
HConstants.HREGION_OLDLOGDIR_NAME, HConstants.HREGION_OLDLOGDIR_NAME,
HConstants.CORRUPT_DIR_NAME, HConstants.CORRUPT_DIR_NAME,
WALProcedureStore.MASTER_PROCEDURE_LOGDIR WALProcedureStore.MASTER_PROCEDURE_LOGDIR,
ReplicationUtils.REMOTE_WAL_DIR_NAME
}; };
// check if the root directory exists // check if the root directory exists
checkRootDir(this.rootdir, conf, this.fs); checkRootDir(this.rootdir, conf, this.fs);
@ -192,7 +193,9 @@ public class MasterFileSystem {
return this.fs; return this.fs;
} }
protected FileSystem getWALFileSystem() { return this.walFs; } public FileSystem getWALFileSystem() {
return this.walFs;
}
public Configuration getConfiguration() { public Configuration getConfiguration() {
return this.conf; return this.conf;
@ -234,13 +237,9 @@ public class MasterFileSystem {
} }
/** /**
* Get the rootdir. Make sure its wholesome and exists before returning. * Get the rootdir. Make sure its wholesome and exists before returning.
* @param rd * @return hbase.rootdir (after checks for existence and bootstrapping if needed populating the
* @param c * directory with necessary bootup files).
* @param fs
* @return hbase.rootdir (after checks for existence and bootstrapping if
* needed populating the directory with necessary bootup files).
* @throws IOException
*/ */
private Path checkRootDir(final Path rd, final Configuration c, final FileSystem fs) private Path checkRootDir(final Path rd, final Configuration c, final FileSystem fs)
throws IOException { throws IOException {

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
@ -139,6 +140,10 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return master.getReplicationPeerManager(); return master.getReplicationPeerManager();
} }
public MasterFileSystem getMasterFileSystem() {
return master.getMasterFileSystem();
}
public boolean isRunning() { public boolean isRunning() {
if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false; if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false;
return master.getMasterProcedureExecutor().isRunning(); return master.getMasterProcedureExecutor().isRunning();

View File

@ -20,14 +20,18 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -44,7 +48,7 @@ public class TransitPeerSyncReplicationStateProcedure
extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> { extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class); LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
private SyncReplicationState fromState; private SyncReplicationState fromState;
@ -67,8 +71,8 @@ public class TransitPeerSyncReplicationStateProcedure
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer); super.serializeStateData(serializer);
TransitPeerSyncReplicationStateStateData.Builder builder = TransitPeerSyncReplicationStateStateData.Builder builder =
TransitPeerSyncReplicationStateStateData.newBuilder() TransitPeerSyncReplicationStateStateData.newBuilder()
.setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState)); .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
if (fromState != null) { if (fromState != null) {
builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState)); builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
} }
@ -79,7 +83,7 @@ public class TransitPeerSyncReplicationStateProcedure
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer); super.deserializeStateData(serializer);
TransitPeerSyncReplicationStateStateData data = TransitPeerSyncReplicationStateStateData data =
serializer.deserialize(TransitPeerSyncReplicationStateStateData.class); serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState()); toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
if (data.hasFromState()) { if (data.hasFromState()) {
fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState()); fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
@ -205,7 +209,22 @@ public class TransitPeerSyncReplicationStateProcedure
} }
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case CREATE_DIR_FOR_REMOTE_WAL: case CREATE_DIR_FOR_REMOTE_WAL:
// TODO: create wal for write remote wal MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
Path remoteWALDirForPeer = new Path(remoteWALDir, peerId);
FileSystem walFs = mfs.getWALFileSystem();
try {
if (walFs.exists(remoteWALDirForPeer)) {
LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
remoteWALDirForPeer);
} else if (!walFs.mkdirs(remoteWALDirForPeer)) {
LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer);
throw new ProcedureYieldException();
}
} catch (IOException e) {
LOG.warn("Failed to create remote wal dir {}", remoteWALDirForPeer, e);
throw new ProcedureYieldException();
}
setNextState( setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;

View File

@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.replication;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@ -154,8 +157,13 @@ public class TestSyncReplication {
@Test @Test
public void testStandby() throws Exception { public void testStandby() throws Exception {
MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
Path remoteWALDirForPeer = new Path(remoteWALDir, PEER_ID);
assertFalse(mfs.getWALFileSystem().exists(remoteWALDirForPeer));
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY); SyncReplicationState.STANDBY);
assertTrue(mfs.getWALFileSystem().exists(remoteWALDirForPeer));
try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row")))); assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row"))));
assertDisallow(table, assertDisallow(table,