HBASE-19973 Implement a procedure to replay sync replication wal for standby cluster
This commit is contained in:
parent
45794d4156
commit
183b8d0581
|
@ -480,3 +480,25 @@ message TransitPeerSyncReplicationStateStateData {
|
|||
optional SyncReplicationState fromState = 1;
|
||||
required SyncReplicationState toState = 2;
|
||||
}
|
||||
|
||||
enum RecoverStandbyState {
|
||||
RENAME_SYNC_REPLICATION_WALS_DIR = 1;
|
||||
INIT_WORKERS = 2;
|
||||
DISPATCH_TASKS = 3;
|
||||
REMOVE_SYNC_REPLICATION_WALS_DIR = 4;
|
||||
}
|
||||
|
||||
message RecoverStandbyStateData {
|
||||
required string peer_id = 1;
|
||||
}
|
||||
|
||||
message ReplaySyncReplicationWALStateData {
|
||||
required string peer_id = 1;
|
||||
required string wal = 2;
|
||||
optional ServerName target_server = 3;
|
||||
}
|
||||
|
||||
message ReplaySyncReplicationWALParameter {
|
||||
required string peer_id = 1;
|
||||
required string wal = 2;
|
||||
}
|
||||
|
|
|
@ -281,7 +281,14 @@ public enum EventType {
|
|||
*
|
||||
* RS_REFRESH_PEER
|
||||
*/
|
||||
RS_REFRESH_PEER (84, ExecutorType.RS_REFRESH_PEER);
|
||||
RS_REFRESH_PEER(84, ExecutorType.RS_REFRESH_PEER),
|
||||
|
||||
/**
|
||||
* RS replay sync replication wal.<br>
|
||||
*
|
||||
* RS_REPLAY_SYNC_REPLICATION_WAL
|
||||
*/
|
||||
RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL);
|
||||
|
||||
private final int code;
|
||||
private final ExecutorType executor;
|
||||
|
|
|
@ -47,7 +47,8 @@ public enum ExecutorType {
|
|||
RS_REGION_REPLICA_FLUSH_OPS (28),
|
||||
RS_COMPACTED_FILES_DISCHARGER (29),
|
||||
RS_OPEN_PRIORITY_REGION (30),
|
||||
RS_REFRESH_PEER (31);
|
||||
RS_REFRESH_PEER(31),
|
||||
RS_REPLAY_SYNC_REPLICATION_WAL(32);
|
||||
|
||||
ExecutorType(int value) {
|
||||
}
|
||||
|
|
|
@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
|
|||
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
|
||||
|
@ -342,6 +343,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// manager of replication
|
||||
private ReplicationPeerManager replicationPeerManager;
|
||||
|
||||
private ReplaySyncReplicationWALManager replaySyncReplicationWALManager;
|
||||
|
||||
// buffer for "fatal error" notices from region servers
|
||||
// in the cluster. This is only used for assisting
|
||||
// operations/debugging.
|
||||
|
@ -849,6 +852,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
initializeMemStoreChunkCreator();
|
||||
this.fileSystemManager = new MasterFileSystem(conf);
|
||||
this.walManager = new MasterWalManager(this);
|
||||
this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this);
|
||||
|
||||
// enable table descriptors cache
|
||||
this.tableDescriptors.setCacheOn();
|
||||
|
@ -3758,4 +3762,9 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() {
|
||||
return this.snapshotQuotaChore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() {
|
||||
return this.replaySyncReplicationWALManager;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
|||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
|
@ -460,6 +461,11 @@ public interface MasterServices extends Server {
|
|||
*/
|
||||
ReplicationPeerManager getReplicationPeerManager();
|
||||
|
||||
/**
|
||||
* Returns the {@link ReplaySyncReplicationWALManager}.
|
||||
*/
|
||||
ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager();
|
||||
|
||||
/**
|
||||
* Update the peerConfig for the specified peer
|
||||
* @param peerId a short name that identifies the peer
|
||||
|
|
|
@ -23,7 +23,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
public interface PeerProcedureInterface {
|
||||
|
||||
enum PeerOperationType {
|
||||
ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE
|
||||
ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE,
|
||||
RECOVER_STANDBY, REPLAY_SYNC_REPLICATION_WAL
|
||||
}
|
||||
|
||||
String getPeerId();
|
||||
|
|
|
@ -49,6 +49,7 @@ class PeerQueue extends Queue<String> {
|
|||
}
|
||||
|
||||
private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
|
||||
return proc.getPeerOperationType() != PeerOperationType.REFRESH;
|
||||
return proc.getPeerOperationType() != PeerOperationType.REFRESH
|
||||
&& proc.getPeerOperationType() != PeerOperationType.REPLAY_SYNC_REPLICATION_WAL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandbyState> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RecoverStandbyProcedure.class);
|
||||
|
||||
public RecoverStandbyProcedure() {
|
||||
}
|
||||
|
||||
public RecoverStandbyProcedure(String peerId) {
|
||||
super(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
ReplaySyncReplicationWALManager replaySyncReplicationWALManager =
|
||||
env.getMasterServices().getReplaySyncReplicationWALManager();
|
||||
switch (state) {
|
||||
case RENAME_SYNC_REPLICATION_WALS_DIR:
|
||||
try {
|
||||
replaySyncReplicationWALManager.renamePeerRemoteWALDir(peerId);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e);
|
||||
setFailure("master-recover-standby", e);
|
||||
return Flow.NO_MORE_STATE;
|
||||
}
|
||||
setNextState(RecoverStandbyState.INIT_WORKERS);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case INIT_WORKERS:
|
||||
replaySyncReplicationWALManager.initPeerWorkers(peerId);
|
||||
setNextState(RecoverStandbyState.DISPATCH_TASKS);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case DISPATCH_TASKS:
|
||||
addChildProcedure(getReplayWALs(replaySyncReplicationWALManager).stream()
|
||||
.map(wal -> new ReplaySyncReplicationWALProcedure(peerId,
|
||||
replaySyncReplicationWALManager.removeWALRootPath(wal)))
|
||||
.toArray(ReplaySyncReplicationWALProcedure[]::new));
|
||||
setNextState(RecoverStandbyState.REMOVE_SYNC_REPLICATION_WALS_DIR);
|
||||
return Flow.HAS_MORE_STATE;
|
||||
case REMOVE_SYNC_REPLICATION_WALS_DIR:
|
||||
try {
|
||||
replaySyncReplicationWALManager.removePeerReplayWALDir(peerId);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
}
|
||||
|
||||
private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager)
|
||||
throws ProcedureYieldException {
|
||||
try {
|
||||
return replaySyncReplicationWALManager.getReplayWALs(peerId);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RecoverStandbyState getState(int stateId) {
|
||||
return RecoverStandbyState.forNumber(stateId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStateId(RecoverStandbyState state) {
|
||||
return state.getNumber();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RecoverStandbyState getInitialState() {
|
||||
return RecoverStandbyState.RENAME_SYNC_REPLICATION_WALS_DIR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.RECOVER_STANDBY;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class ReplaySyncReplicationWALManager {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class);
|
||||
|
||||
private static final String REPLAY_SUFFIX = "-replay";
|
||||
|
||||
private final MasterServices services;
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
private final FileSystem fs;
|
||||
|
||||
private final Path walRootDir;
|
||||
|
||||
private final Path remoteWALDir;
|
||||
|
||||
private final Map<String, BlockingQueue<ServerName>> availServers = new HashMap<>();
|
||||
|
||||
public ReplaySyncReplicationWALManager(MasterServices services) {
|
||||
this.services = services;
|
||||
this.conf = services.getConfiguration();
|
||||
this.fs = services.getMasterFileSystem().getWALFileSystem();
|
||||
this.walRootDir = services.getMasterFileSystem().getWALRootDir();
|
||||
this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
|
||||
}
|
||||
|
||||
public Path getPeerRemoteWALDir(String peerId) {
|
||||
return new Path(this.remoteWALDir, peerId);
|
||||
}
|
||||
|
||||
private Path getPeerReplayWALDir(String peerId) {
|
||||
return getPeerRemoteWALDir(peerId).suffix(REPLAY_SUFFIX);
|
||||
}
|
||||
|
||||
public void createPeerRemoteWALDir(String peerId) throws IOException {
|
||||
Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
|
||||
if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
|
||||
throw new IOException("Unable to mkdir " + peerRemoteWALDir);
|
||||
}
|
||||
}
|
||||
|
||||
public void renamePeerRemoteWALDir(String peerId) throws IOException {
|
||||
Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
|
||||
Path peerReplayWALDir = peerRemoteWALDir.suffix(REPLAY_SUFFIX);
|
||||
if (fs.exists(peerRemoteWALDir)) {
|
||||
if (!fs.rename(peerRemoteWALDir, peerReplayWALDir)) {
|
||||
throw new IOException("Failed rename remote wal dir from " + peerRemoteWALDir + " to "
|
||||
+ peerReplayWALDir + " for peer id=" + peerId);
|
||||
}
|
||||
LOG.info("Rename remote wal dir from {} to {} for peer id={}", remoteWALDir, peerReplayWALDir,
|
||||
peerId);
|
||||
} else if (!fs.exists(peerReplayWALDir)) {
|
||||
throw new IOException("Remote wal dir " + peerRemoteWALDir + " and replay wal dir "
|
||||
+ peerReplayWALDir + " not exist for peer id=" + peerId);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Path> getReplayWALs(String peerId) throws IOException {
|
||||
Path peerReplayWALDir = getPeerReplayWALDir(peerId);
|
||||
List<Path> replayWals = new ArrayList<>();
|
||||
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(peerReplayWALDir, false);
|
||||
while (iterator.hasNext()) {
|
||||
replayWals.add(iterator.next().getPath());
|
||||
}
|
||||
return replayWals;
|
||||
}
|
||||
|
||||
public void removePeerReplayWALDir(String peerId) throws IOException {
|
||||
Path peerReplayWALDir = getPeerReplayWALDir(peerId);
|
||||
if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
|
||||
throw new IOException(
|
||||
"Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId);
|
||||
}
|
||||
}
|
||||
|
||||
public void initPeerWorkers(String peerId) {
|
||||
BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>();
|
||||
services.getServerManager().getOnlineServers().keySet()
|
||||
.forEach(server -> servers.offer(server));
|
||||
availServers.put(peerId, servers);
|
||||
}
|
||||
|
||||
public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit)
|
||||
throws InterruptedException {
|
||||
return availServers.get(peerId).poll(timeout, unit);
|
||||
}
|
||||
|
||||
public void addAvailServer(String peerId, ServerName server) {
|
||||
availServers.get(peerId).offer(server);
|
||||
}
|
||||
|
||||
public String removeWALRootPath(Path path) {
|
||||
String pathStr = path.toString();
|
||||
// remove the "/" too.
|
||||
return pathStr.substring(walRootDir.toString().length() + 1);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,196 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.replication;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
||||
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALStateData;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedureEnv>
|
||||
implements RemoteProcedure<MasterProcedureEnv, ServerName>, PeerProcedureInterface {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ReplaySyncReplicationWALProcedure.class);
|
||||
|
||||
private static final long DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT = 10000;
|
||||
|
||||
private String peerId;
|
||||
|
||||
private ServerName targetServer = null;
|
||||
|
||||
private String wal;
|
||||
|
||||
private boolean dispatched;
|
||||
|
||||
private ProcedureEvent<?> event;
|
||||
|
||||
private boolean succ;
|
||||
|
||||
public ReplaySyncReplicationWALProcedure() {
|
||||
}
|
||||
|
||||
public ReplaySyncReplicationWALProcedure(String peerId, String wal) {
|
||||
this.peerId = peerId;
|
||||
this.wal = wal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
|
||||
return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class,
|
||||
ReplaySyncReplicationWALParameter.newBuilder().setPeerId(peerId).setWal(wal).build()
|
||||
.toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) {
|
||||
complete(env, exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remoteOperationCompleted(MasterProcedureEnv env) {
|
||||
complete(env, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
|
||||
complete(env, error);
|
||||
}
|
||||
|
||||
private void complete(MasterProcedureEnv env, Throwable error) {
|
||||
if (event == null) {
|
||||
LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
|
||||
getProcId());
|
||||
return;
|
||||
}
|
||||
ReplaySyncReplicationWALManager replaySyncReplicationWALManager =
|
||||
env.getMasterServices().getReplaySyncReplicationWALManager();
|
||||
if (error != null) {
|
||||
LOG.warn("Replay sync replication wal {} on {} failed for peer id={}", wal, targetServer,
|
||||
peerId, error);
|
||||
this.succ = false;
|
||||
} else {
|
||||
LOG.warn("Replay sync replication wal {} on {} suceeded for peer id={}", wal, targetServer,
|
||||
peerId);
|
||||
this.succ = true;
|
||||
replaySyncReplicationWALManager.addAvailServer(peerId, targetServer);
|
||||
}
|
||||
event.wake(env.getProcedureScheduler());
|
||||
event = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
if (dispatched) {
|
||||
if (succ) {
|
||||
return null;
|
||||
}
|
||||
// retry
|
||||
dispatched = false;
|
||||
}
|
||||
|
||||
// Try poll a available server
|
||||
if (targetServer == null) {
|
||||
targetServer = env.getMasterServices().getReplaySyncReplicationWALManager()
|
||||
.getAvailServer(peerId, DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
if (targetServer == null) {
|
||||
LOG.info("No available server to replay wal {} for peer id={}, retry", wal, peerId);
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch task to target server
|
||||
try {
|
||||
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
|
||||
} catch (FailedRemoteDispatchException e) {
|
||||
LOG.info(
|
||||
"Can not add remote operation for replay wal {} on {} for peer id={}, " +
|
||||
"this usually because the server is already dead, " + "retry",
|
||||
wal, targetServer, peerId, e);
|
||||
targetServer = null;
|
||||
throw new ProcedureYieldException();
|
||||
}
|
||||
dispatched = true;
|
||||
event = new ProcedureEvent<>(this);
|
||||
event.suspendIfNotReady(this);
|
||||
throw new ProcedureSuspendedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(MasterProcedureEnv env) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
ReplaySyncReplicationWALStateData.Builder builder =
|
||||
ReplaySyncReplicationWALStateData.newBuilder().setPeerId(peerId).setWal(wal);
|
||||
if (targetServer != null) {
|
||||
builder.setTargetServer(ProtobufUtil.toServerName(targetServer));
|
||||
}
|
||||
serializer.serialize(builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
|
||||
ReplaySyncReplicationWALStateData data =
|
||||
serializer.deserialize(ReplaySyncReplicationWALStateData.class);
|
||||
peerId = data.getPeerId();
|
||||
wal = data.getWal();
|
||||
if (data.hasTargetServer()) {
|
||||
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPeerId() {
|
||||
return peerId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerOperationType getPeerOperationType() {
|
||||
return PeerOperationType.REPLAY_SYNC_REPLICATION_WAL;
|
||||
}
|
||||
}
|
|
@ -1932,6 +1932,11 @@ public class HRegionServer extends HasThread implements
|
|||
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
|
||||
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
|
||||
|
||||
if (conf.getBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false)) {
|
||||
this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
|
||||
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 2));
|
||||
}
|
||||
|
||||
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
|
||||
uncaughtExceptionHandler);
|
||||
this.cacheFlusher.start(uncaughtExceptionHandler);
|
||||
|
@ -2877,14 +2882,14 @@ public class HRegionServer extends HasThread implements
|
|||
/**
|
||||
* @return Return the walRootDir.
|
||||
*/
|
||||
protected Path getWALRootDir() {
|
||||
public Path getWALRootDir() {
|
||||
return walRootDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return the walFs.
|
||||
*/
|
||||
protected FileSystem getWALFileSystem() {
|
||||
public FileSystem getWALFileSystem() {
|
||||
return walFs;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
|
||||
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
|
||||
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Reader;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
|
||||
|
||||
/**
|
||||
* This callable executed at RS side to replay sync replication wal.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALCallable.class);
|
||||
|
||||
private static final String REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE =
|
||||
"hbase.replay.sync.replication.wal.batch.size";
|
||||
|
||||
private static final long DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 8 * 1024 * 1024;
|
||||
|
||||
private HRegionServer rs;
|
||||
|
||||
private FileSystem fs;
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
private String peerId;
|
||||
|
||||
private String wal;
|
||||
|
||||
private Exception initError;
|
||||
|
||||
private long batchSize;
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
if (initError != null) {
|
||||
throw initError;
|
||||
}
|
||||
LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId);
|
||||
try (Reader reader = getReader()) {
|
||||
List<Entry> entries = readWALEntries(reader);
|
||||
while (!entries.isEmpty()) {
|
||||
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
|
||||
.buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
|
||||
HBaseRpcController controller = new HBaseRpcControllerImpl(pair.getSecond());
|
||||
rs.getRSRpcServices().replicateWALEntry(controller, pair.getFirst());
|
||||
entries = readWALEntries(reader);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(byte[] parameter, HRegionServer rs) {
|
||||
this.rs = rs;
|
||||
this.fs = rs.getWALFileSystem();
|
||||
this.conf = rs.getConfiguration();
|
||||
try {
|
||||
ReplaySyncReplicationWALParameter param =
|
||||
ReplaySyncReplicationWALParameter.parseFrom(parameter);
|
||||
this.peerId = param.getPeerId();
|
||||
this.wal = param.getWal();
|
||||
this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE,
|
||||
DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
initError = e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventType getEventType() {
|
||||
return EventType.RS_REPLAY_SYNC_REPLICATION_WAL;
|
||||
}
|
||||
|
||||
private Reader getReader() throws IOException {
|
||||
Path path = new Path(rs.getWALRootDir(), wal);
|
||||
long length = rs.getWALFileSystem().getFileStatus(path).getLen();
|
||||
try {
|
||||
FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf);
|
||||
return WALFactory.createReader(rs.getWALFileSystem(), path, rs.getConfiguration());
|
||||
} catch (EOFException e) {
|
||||
if (length <= 0) {
|
||||
LOG.warn("File is empty. Could not open {} for reading because {}", path, e);
|
||||
return null;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private List<Entry> readWALEntries(Reader reader) throws IOException {
|
||||
List<Entry> entries = new ArrayList<>();
|
||||
if (reader == null) {
|
||||
return entries;
|
||||
}
|
||||
long size = 0;
|
||||
Entry entry = reader.next();
|
||||
while (entry != null) {
|
||||
entries.add(entry);
|
||||
size += entry.getEdit().heapSize();
|
||||
if (size > batchSize) {
|
||||
break;
|
||||
}
|
||||
entry = reader.next();
|
||||
}
|
||||
return entries;
|
||||
}
|
||||
}
|
|
@ -41,6 +41,9 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
|
|||
|
||||
@Override
|
||||
public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) {
|
||||
if (info == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
String peerId = mapping.getPeerId(info);
|
||||
if (peerId == null) {
|
||||
return Optional.empty();
|
||||
|
|
|
@ -939,6 +939,11 @@ public abstract class FSUtils extends CommonFSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
|
||||
throws IOException {
|
||||
recoverFileLease(fs, p, conf, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover file lease. Used when a file might be suspect
|
||||
* to be had been left open by another process.
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
|||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||
|
@ -473,4 +474,9 @@ public class MockNoopMasterServices implements MasterServices {
|
|||
SyncReplicationState clusterState) throws ReplicationException, IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,186 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication.master;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure;
|
||||
import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({MasterTests.class, LargeTests.class})
|
||||
public class TestRecoverStandbyProcedure {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRecoverStandbyProcedure.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestRecoverStandbyProcedure.class);
|
||||
|
||||
private static final TableName tableName = TableName.valueOf("TestRecoverStandbyProcedure");
|
||||
|
||||
private static final RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
|
||||
|
||||
private static final byte[] family = Bytes.toBytes("CF");
|
||||
|
||||
private static final byte[] qualifier = Bytes.toBytes("q");
|
||||
|
||||
private static final long timestamp = System.currentTimeMillis();
|
||||
|
||||
private static final int ROW_COUNT = 1000;
|
||||
|
||||
private static final int WAL_NUMBER = 10;
|
||||
|
||||
private static final int RS_NUMBER = 3;
|
||||
|
||||
private static final String PEER_ID = "1";
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static ReplaySyncReplicationWALManager replaySyncReplicationWALManager;
|
||||
|
||||
private static ProcedureExecutor<MasterProcedureEnv> procExec;
|
||||
|
||||
private static FileSystem fs;
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
UTIL.getConfiguration().setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true);
|
||||
UTIL.startMiniCluster(RS_NUMBER);
|
||||
UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
|
||||
conf = UTIL.getConfiguration();
|
||||
HMaster master = UTIL.getHBaseCluster().getMaster();
|
||||
fs = master.getMasterFileSystem().getWALFileSystem();
|
||||
replaySyncReplicationWALManager = master.getReplaySyncReplicationWALManager();
|
||||
procExec = master.getMasterProcedureExecutor();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanupTest() throws Exception {
|
||||
try {
|
||||
UTIL.shutdownMiniCluster();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failure shutting down cluster", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupBeforeTest() throws IOException {
|
||||
UTIL.createTable(tableName, family);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownAfterTest() throws IOException {
|
||||
try (Admin admin = UTIL.getAdmin()) {
|
||||
if (admin.isTableEnabled(tableName)) {
|
||||
admin.disableTable(tableName);
|
||||
}
|
||||
admin.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoverStandby() throws IOException, StreamLacksCapabilityException {
|
||||
setupSyncReplicationWALs();
|
||||
long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID));
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId);
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
|
||||
|
||||
try (Table table = UTIL.getConnection().getTable(tableName)) {
|
||||
for (int i = 0; i < WAL_NUMBER * ROW_COUNT; i++) {
|
||||
Result result = table.get(new Get(Bytes.toBytes(i)).setTimestamp(timestamp));
|
||||
assertNotNull(result);
|
||||
assertEquals(i, Bytes.toInt(result.getValue(family, qualifier)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException {
|
||||
Path peerRemoteWALDir = replaySyncReplicationWALManager.getPeerRemoteWALDir(PEER_ID);
|
||||
if (!fs.exists(peerRemoteWALDir)) {
|
||||
fs.mkdirs(peerRemoteWALDir);
|
||||
}
|
||||
for (int i = 0; i < WAL_NUMBER; i++) {
|
||||
try (ProtobufLogWriter writer = new ProtobufLogWriter()) {
|
||||
Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep");
|
||||
writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir));
|
||||
List<Entry> entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT);
|
||||
for (Entry entry : entries) {
|
||||
writer.append(entry);
|
||||
}
|
||||
writer.sync(false);
|
||||
LOG.info("Created wal {} to replay for peer id={}", wal, PEER_ID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<Entry> setupWALEntries(int startRow, int endRow) {
|
||||
return IntStream.range(startRow, endRow)
|
||||
.mapToObj(i -> createWALEntry(Bytes.toBytes(i), Bytes.toBytes(i)))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private Entry createWALEntry(byte[] row, byte[] value) {
|
||||
WALKeyImpl key = new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, 1);
|
||||
WALEdit edit = new WALEdit();
|
||||
edit.add(new KeyValue(row, family, qualifier, timestamp, value));
|
||||
return new Entry(key, edit);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue