HBASE-16839 Procedure v2 - Move all protobuf handling to ProcedureUtil
This commit is contained in:
parent
13baf4d37a
commit
62c84115ec
|
@ -121,7 +121,7 @@ public class ProcedureInfo implements Cloneable {
|
|||
return procName;
|
||||
}
|
||||
|
||||
private boolean hasOwner() {
|
||||
public boolean hasOwner() {
|
||||
return procOwner != null;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Stable
|
||||
public class BadProcedureException extends HBaseIOException {
|
||||
public BadProcedureException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public BadProcedureException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public BadProcedureException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public BadProcedureException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
|
@ -21,26 +21,19 @@ package org.apache.hadoop.hbase.procedure2;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.NonceKey;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Base Procedure class responsible to handle the Procedure Metadata
|
||||
|
@ -610,7 +603,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
* Called on store load to initialize the Procedure internals after
|
||||
* the creation/deserialization.
|
||||
*/
|
||||
private synchronized void setLastUpdate(final long lastUpdate) {
|
||||
protected synchronized void setLastUpdate(final long lastUpdate) {
|
||||
this.lastUpdate = lastUpdate;
|
||||
}
|
||||
|
||||
|
@ -734,163 +727,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
return proc.getProcId();
|
||||
}
|
||||
|
||||
protected static Procedure newInstance(final String className) throws IOException {
|
||||
try {
|
||||
Class<?> clazz = Class.forName(className);
|
||||
if (!Modifier.isPublic(clazz.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " class is not public");
|
||||
}
|
||||
|
||||
Constructor<?> ctor = clazz.getConstructor();
|
||||
assert ctor != null : "no constructor found";
|
||||
if (!Modifier.isPublic(ctor.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " constructor is not public");
|
||||
}
|
||||
return (Procedure)ctor.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("The procedure class " + className +
|
||||
" must be accessible and have an empty constructor", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected static void validateClass(final Procedure proc) throws IOException {
|
||||
try {
|
||||
Class<?> clazz = proc.getClass();
|
||||
if (!Modifier.isPublic(clazz.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " class is not public");
|
||||
}
|
||||
|
||||
Constructor<?> ctor = clazz.getConstructor();
|
||||
assert ctor != null;
|
||||
if (!Modifier.isPublic(ctor.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " constructor is not public");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IOException("The procedure class " + proc.getClass().getName() +
|
||||
" must be accessible and have an empty constructor", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to convert the procedure to protobuf.
|
||||
* Used by ProcedureStore implementations.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static ProcedureProtos.Procedure convert(final Procedure proc)
|
||||
throws IOException {
|
||||
Preconditions.checkArgument(proc != null);
|
||||
validateClass(proc);
|
||||
|
||||
ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
|
||||
.setClassName(proc.getClass().getName())
|
||||
.setProcId(proc.getProcId())
|
||||
.setState(proc.getState())
|
||||
.setStartTime(proc.getStartTime())
|
||||
.setLastUpdate(proc.getLastUpdate());
|
||||
|
||||
if (proc.hasParent()) {
|
||||
builder.setParentId(proc.getParentProcId());
|
||||
}
|
||||
|
||||
if (proc.hasTimeout()) {
|
||||
builder.setTimeout(proc.getTimeout());
|
||||
}
|
||||
|
||||
if (proc.hasOwner()) {
|
||||
builder.setOwner(proc.getOwner());
|
||||
}
|
||||
|
||||
int[] stackIds = proc.getStackIndexes();
|
||||
if (stackIds != null) {
|
||||
for (int i = 0; i < stackIds.length; ++i) {
|
||||
builder.addStackId(stackIds[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if (proc.hasException()) {
|
||||
RemoteProcedureException exception = proc.getException();
|
||||
builder.setException(
|
||||
RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
|
||||
}
|
||||
|
||||
byte[] result = proc.getResult();
|
||||
if (result != null) {
|
||||
builder.setResult(UnsafeByteOperations.unsafeWrap(result));
|
||||
}
|
||||
|
||||
ByteString.Output stateStream = ByteString.newOutput();
|
||||
proc.serializeStateData(stateStream);
|
||||
if (stateStream.size() > 0) {
|
||||
builder.setStateData(stateStream.toByteString());
|
||||
}
|
||||
|
||||
if (proc.getNonceKey() != null) {
|
||||
builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
|
||||
builder.setNonce(proc.getNonceKey().getNonce());
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to convert the protobuf procedure.
|
||||
* Used by ProcedureStore implementations.
|
||||
*
|
||||
* TODO: OPTIMIZATION: some of the field never change during the execution
|
||||
* (e.g. className, procId, parentId, ...).
|
||||
* We can split in 'data' and 'state', and the store
|
||||
* may take advantage of it by storing the data only on insert().
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static Procedure convert(final ProcedureProtos.Procedure proto)
|
||||
throws IOException {
|
||||
// Procedure from class name
|
||||
Procedure proc = Procedure.newInstance(proto.getClassName());
|
||||
|
||||
// set fields
|
||||
proc.setProcId(proto.getProcId());
|
||||
proc.setState(proto.getState());
|
||||
proc.setStartTime(proto.getStartTime());
|
||||
proc.setLastUpdate(proto.getLastUpdate());
|
||||
|
||||
if (proto.hasParentId()) {
|
||||
proc.setParentProcId(proto.getParentId());
|
||||
}
|
||||
|
||||
if (proto.hasOwner()) {
|
||||
proc.setOwner(proto.getOwner());
|
||||
}
|
||||
|
||||
if (proto.hasTimeout()) {
|
||||
proc.setTimeout(proto.getTimeout());
|
||||
}
|
||||
|
||||
if (proto.getStackIdCount() > 0) {
|
||||
proc.setStackIndexes(proto.getStackIdList());
|
||||
}
|
||||
|
||||
if (proto.hasException()) {
|
||||
assert proc.getState() == ProcedureState.FINISHED ||
|
||||
proc.getState() == ProcedureState.ROLLEDBACK :
|
||||
"The procedure must be failed (waiting to rollback) or rolledback";
|
||||
proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
|
||||
}
|
||||
|
||||
if (proto.hasResult()) {
|
||||
proc.setResult(proto.getResult().toByteArray());
|
||||
}
|
||||
|
||||
if (proto.getNonce() != HConstants.NO_NONCE) {
|
||||
NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
|
||||
proc.setNonceKey(nonceKey);
|
||||
}
|
||||
|
||||
// we want to call deserialize even when the stream is empty, mainly for testing.
|
||||
proc.deserializeStateData(proto.getStateData().newInput());
|
||||
|
||||
return proc;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param a the first procedure to be compared.
|
||||
* @param b the second procedure to be compared.
|
||||
|
|
|
@ -649,12 +649,12 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* @return the procedures in a list
|
||||
*/
|
||||
public List<ProcedureInfo> listProcedures() {
|
||||
List<ProcedureInfo> procedureLists =
|
||||
final List<ProcedureInfo> procedureLists =
|
||||
new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
|
||||
for (java.util.Map.Entry<Long, Procedure> p: procedures.entrySet()) {
|
||||
procedureLists.add(ProcedureUtil.createProcedureInfo(p.getValue()));
|
||||
for (Map.Entry<Long, Procedure> p: procedures.entrySet()) {
|
||||
procedureLists.add(ProcedureUtil.convertToProcedureInfo(p.getValue()));
|
||||
}
|
||||
for (java.util.Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
|
||||
for (Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
|
||||
// Note: The procedure could show up twice in the list with different state, as
|
||||
// it could complete after we walk through procedures list and insert into
|
||||
// procedureList - it is ok, as we will use the information in the ProcedureInfo
|
||||
|
@ -1407,7 +1407,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
execCompletionCleanup(proc);
|
||||
|
||||
// update the executor internal state maps
|
||||
ProcedureInfo procInfo = ProcedureUtil.createProcedureInfo(proc, proc.getNonceKey());
|
||||
final ProcedureInfo procInfo = ProcedureUtil.convertToProcedureInfo(proc, proc.getNonceKey());
|
||||
if (!proc.shouldWaitClientAck(getEnvironment())) {
|
||||
procInfo.setClientAckTime(0);
|
||||
}
|
||||
|
|
|
@ -17,10 +17,17 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Modifier;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||
import org.apache.hadoop.hbase.ProcedureState;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||
|
@ -31,14 +38,181 @@ import org.apache.hadoop.hbase.util.NonceKey;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class ProcedureUtil {
|
||||
|
||||
private ProcedureUtil() { }
|
||||
|
||||
// ==========================================================================
|
||||
// Reflection helpers to create/validate a Procedure object
|
||||
// ==========================================================================
|
||||
public static Procedure newProcedure(final String className) throws BadProcedureException {
|
||||
try {
|
||||
final Class<?> clazz = Class.forName(className);
|
||||
if (!Modifier.isPublic(clazz.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " class is not public");
|
||||
}
|
||||
|
||||
final Constructor<?> ctor = clazz.getConstructor();
|
||||
assert ctor != null : "no constructor found";
|
||||
if (!Modifier.isPublic(ctor.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " constructor is not public");
|
||||
}
|
||||
return (Procedure)ctor.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new BadProcedureException("The procedure class " + className +
|
||||
" must be accessible and have an empty constructor", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void validateClass(final Procedure proc) throws BadProcedureException {
|
||||
try {
|
||||
final Class<?> clazz = proc.getClass();
|
||||
if (!Modifier.isPublic(clazz.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " class is not public");
|
||||
}
|
||||
|
||||
final Constructor<?> ctor = clazz.getConstructor();
|
||||
assert ctor != null;
|
||||
if (!Modifier.isPublic(ctor.getModifiers())) {
|
||||
throw new Exception("the " + clazz + " constructor is not public");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new BadProcedureException("The procedure class " + proc.getClass().getName() +
|
||||
" must be accessible and have an empty constructor", e);
|
||||
}
|
||||
}
|
||||
|
||||
// ==========================================================================
|
||||
// convert to and from Procedure object
|
||||
// ==========================================================================
|
||||
|
||||
/**
|
||||
* Helper to convert the procedure to protobuf.
|
||||
* Used by ProcedureStore implementations.
|
||||
*/
|
||||
public static ProcedureProtos.Procedure convertToProtoProcedure(final Procedure proc)
|
||||
throws IOException {
|
||||
Preconditions.checkArgument(proc != null);
|
||||
validateClass(proc);
|
||||
|
||||
final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
|
||||
.setClassName(proc.getClass().getName())
|
||||
.setProcId(proc.getProcId())
|
||||
.setState(proc.getState())
|
||||
.setStartTime(proc.getStartTime())
|
||||
.setLastUpdate(proc.getLastUpdate());
|
||||
|
||||
if (proc.hasParent()) {
|
||||
builder.setParentId(proc.getParentProcId());
|
||||
}
|
||||
|
||||
if (proc.hasTimeout()) {
|
||||
builder.setTimeout(proc.getTimeout());
|
||||
}
|
||||
|
||||
if (proc.hasOwner()) {
|
||||
builder.setOwner(proc.getOwner());
|
||||
}
|
||||
|
||||
final int[] stackIds = proc.getStackIndexes();
|
||||
if (stackIds != null) {
|
||||
for (int i = 0; i < stackIds.length; ++i) {
|
||||
builder.addStackId(stackIds[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if (proc.hasException()) {
|
||||
RemoteProcedureException exception = proc.getException();
|
||||
builder.setException(
|
||||
RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
|
||||
}
|
||||
|
||||
final byte[] result = proc.getResult();
|
||||
if (result != null) {
|
||||
builder.setResult(UnsafeByteOperations.unsafeWrap(result));
|
||||
}
|
||||
|
||||
final ByteString.Output stateStream = ByteString.newOutput();
|
||||
try {
|
||||
proc.serializeStateData(stateStream);
|
||||
if (stateStream.size() > 0) {
|
||||
builder.setStateData(stateStream.toByteString());
|
||||
}
|
||||
} finally {
|
||||
stateStream.close();
|
||||
}
|
||||
|
||||
if (proc.getNonceKey() != null) {
|
||||
builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
|
||||
builder.setNonce(proc.getNonceKey().getNonce());
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to convert the protobuf procedure.
|
||||
* Used by ProcedureStore implementations.
|
||||
*
|
||||
* TODO: OPTIMIZATION: some of the field never change during the execution
|
||||
* (e.g. className, procId, parentId, ...).
|
||||
* We can split in 'data' and 'state', and the store
|
||||
* may take advantage of it by storing the data only on insert().
|
||||
*/
|
||||
public static Procedure convertToProcedure(final ProcedureProtos.Procedure proto) throws IOException {
|
||||
// Procedure from class name
|
||||
final Procedure proc = newProcedure(proto.getClassName());
|
||||
|
||||
// set fields
|
||||
proc.setProcId(proto.getProcId());
|
||||
proc.setState(proto.getState());
|
||||
proc.setStartTime(proto.getStartTime());
|
||||
proc.setLastUpdate(proto.getLastUpdate());
|
||||
|
||||
if (proto.hasParentId()) {
|
||||
proc.setParentProcId(proto.getParentId());
|
||||
}
|
||||
|
||||
if (proto.hasOwner()) {
|
||||
proc.setOwner(proto.getOwner());
|
||||
}
|
||||
|
||||
if (proto.hasTimeout()) {
|
||||
proc.setTimeout(proto.getTimeout());
|
||||
}
|
||||
|
||||
if (proto.getStackIdCount() > 0) {
|
||||
proc.setStackIndexes(proto.getStackIdList());
|
||||
}
|
||||
|
||||
if (proto.hasException()) {
|
||||
assert proc.getState() == ProcedureProtos.ProcedureState.FINISHED ||
|
||||
proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK :
|
||||
"The procedure must be failed (waiting to rollback) or rolledback";
|
||||
proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
|
||||
}
|
||||
|
||||
if (proto.hasResult()) {
|
||||
proc.setResult(proto.getResult().toByteArray());
|
||||
}
|
||||
|
||||
if (proto.getNonce() != HConstants.NO_NONCE) {
|
||||
proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
|
||||
}
|
||||
|
||||
// we want to call deserialize even when the stream is empty, mainly for testing.
|
||||
proc.deserializeStateData(proto.getStateData().newInput());
|
||||
|
||||
return proc;
|
||||
}
|
||||
|
||||
// ==========================================================================
|
||||
// convert to and from ProcedureInfo object
|
||||
// ==========================================================================
|
||||
|
||||
/**
|
||||
* @return Convert the current {@link ProcedureInfo} into a Protocol Buffers Procedure
|
||||
* instance.
|
||||
*/
|
||||
public static ProcedureProtos.Procedure convertToProcedureProto(final ProcedureInfo procInfo) {
|
||||
public static ProcedureProtos.Procedure convertToProtoProcedure(final ProcedureInfo procInfo) {
|
||||
final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder();
|
||||
|
||||
builder.setClassName(procInfo.getProcName());
|
||||
|
@ -51,7 +225,7 @@ public final class ProcedureUtil {
|
|||
builder.setParentId(procInfo.getParentId());
|
||||
}
|
||||
|
||||
if (procInfo.getProcOwner() != null) {
|
||||
if (procInfo.hasOwner()) {
|
||||
builder.setOwner(procInfo.getProcOwner());
|
||||
}
|
||||
|
||||
|
@ -71,13 +245,14 @@ public final class ProcedureUtil {
|
|||
* @return Convert the current Protocol Buffers Procedure to {@link ProcedureInfo}
|
||||
* instance.
|
||||
*/
|
||||
public static ProcedureInfo convert(final ProcedureProtos.Procedure procProto) {
|
||||
public static ProcedureInfo convertToProcedureInfo(final ProcedureProtos.Procedure procProto) {
|
||||
NonceKey nonceKey = null;
|
||||
if (procProto.getNonce() != HConstants.NO_NONCE) {
|
||||
nonceKey = new NonceKey(procProto.getNonceGroup(), procProto.getNonce());
|
||||
}
|
||||
|
||||
return new ProcedureInfo(procProto.getProcId(), procProto.getClassName(), procProto.getOwner(),
|
||||
return new ProcedureInfo(procProto.getProcId(), procProto.getClassName(),
|
||||
procProto.hasOwner() ? procProto.getOwner() : null,
|
||||
convertToProcedureState(procProto.getState()),
|
||||
procProto.hasParentId() ? procProto.getParentId() : -1, nonceKey,
|
||||
procProto.hasException() ?
|
||||
|
@ -90,14 +265,15 @@ public final class ProcedureUtil {
|
|||
return ProcedureState.valueOf(state.name());
|
||||
}
|
||||
|
||||
public static ProcedureInfo createProcedureInfo(final Procedure proc) {
|
||||
return createProcedureInfo(proc, null);
|
||||
public static ProcedureInfo convertToProcedureInfo(final Procedure proc) {
|
||||
return convertToProcedureInfo(proc, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to create the ProcedureInfo from Procedure.
|
||||
*/
|
||||
public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
|
||||
public static ProcedureInfo convertToProcedureInfo(final Procedure proc,
|
||||
final NonceKey nonceKey) {
|
||||
final RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
|
||||
return new ProcedureInfo(proc.getProcId(), proc.toStringClass(), proc.getOwner(),
|
||||
convertToProcedureState(proc.getState()),
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.io.util.StreamUtils;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
|
||||
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
|
||||
|
@ -205,12 +206,12 @@ public final class ProcedureWALFormat {
|
|||
|
||||
public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type,
|
||||
Procedure proc, Procedure[] subprocs) throws IOException {
|
||||
ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
|
||||
final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
|
||||
builder.setType(type);
|
||||
builder.addProcedure(Procedure.convert(proc));
|
||||
builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
|
||||
if (subprocs != null) {
|
||||
for (int i = 0; i < subprocs.length; ++i) {
|
||||
builder.addProcedure(Procedure.convert(subprocs[i]));
|
||||
builder.addProcedure(ProcedureUtil.convertToProtoProcedure(subprocs[i]));
|
||||
}
|
||||
}
|
||||
builder.build().writeDelimitedTo(slot);
|
||||
|
@ -233,7 +234,7 @@ public final class ProcedureWALFormat {
|
|||
|
||||
public static void writeDelete(ByteSlot slot, long procId)
|
||||
throws IOException {
|
||||
ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
|
||||
final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
|
||||
builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
|
||||
builder.setProcId(procId);
|
||||
builder.build().writeDelimitedTo(slot);
|
||||
|
@ -241,11 +242,11 @@ public final class ProcedureWALFormat {
|
|||
|
||||
public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs)
|
||||
throws IOException {
|
||||
ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
|
||||
final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder();
|
||||
builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE);
|
||||
builder.setProcId(proc.getProcId());
|
||||
if (subprocs != null) {
|
||||
builder.addProcedure(Procedure.convert(proc));
|
||||
builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc));
|
||||
for (int i = 0; i < subprocs.length; ++i) {
|
||||
builder.addChildId(subprocs[i]);
|
||||
}
|
||||
|
|
|
@ -339,13 +339,13 @@ public class ProcedureWALFormatReader {
|
|||
|
||||
public Procedure convert() throws IOException {
|
||||
if (procedure == null) {
|
||||
procedure = Procedure.convert(proto);
|
||||
procedure = ProcedureUtil.convertToProcedure(proto);
|
||||
}
|
||||
return procedure;
|
||||
}
|
||||
|
||||
public ProcedureInfo convertToInfo() {
|
||||
return ProcedureUtil.convert(proto);
|
||||
return ProcedureUtil.convertToProcedureInfo(proto);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.CommandLineParser;
|
||||
import org.apache.commons.cli.HelpFormatter;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
|
@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
|
@ -118,11 +118,11 @@ public class ProcedureWALPrettyPrinter extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
private void printEntry(ProcedureWALEntry entry) throws IOException {
|
||||
private void printEntry(final ProcedureWALEntry entry) throws IOException {
|
||||
out.println("EntryType=" + entry.getType());
|
||||
int procCount = entry.getProcedureCount();
|
||||
for (int i = 0; i < procCount; i++) {
|
||||
Procedure<?> proc = Procedure.convert(entry.getProcedure(i));
|
||||
Procedure<?> proc = ProcedureUtil.convertToProcedure(entry.getProcedure(i));
|
||||
printProcedure(proc);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestProcedureUtil {
|
||||
private static final Log LOG = LogFactory.getLog(TestProcedureUtil.class);
|
||||
|
||||
@Test
|
||||
public void testValidation() throws Exception {
|
||||
ProcedureUtil.validateClass(new TestProcedure(10));
|
||||
}
|
||||
|
||||
@Test(expected = BadProcedureException.class)
|
||||
public void testNoDefaultConstructorValidation() throws Exception {
|
||||
ProcedureUtil.validateClass(new TestProcedureNoDefaultConstructor(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvert() throws Exception {
|
||||
// check Procedure to protobuf conversion
|
||||
final TestProcedure proc1 = new TestProcedure(10);
|
||||
final ProcedureProtos.Procedure proto1 = ProcedureUtil.convertToProtoProcedure(proc1);
|
||||
final TestProcedure proc2 = (TestProcedure)ProcedureUtil.convertToProcedure(proto1);
|
||||
final ProcedureProtos.Procedure proto2 = ProcedureUtil.convertToProtoProcedure(proc2);
|
||||
assertEquals(false, proto2.hasResult());
|
||||
assertEquals("Procedure protobuf does not match", proto1, proto2);
|
||||
|
||||
// remove the state-data from the procedure protobuf to compare it to the gen ProcedureInfo
|
||||
final ProcedureProtos.Procedure pbproc = proto2.toBuilder().clearStateData().build();
|
||||
|
||||
// check ProcedureInfo to protobuf conversion
|
||||
final ProcedureInfo protoInfo1 = ProcedureUtil.convertToProcedureInfo(proc1);
|
||||
final ProcedureProtos.Procedure proto3 = ProcedureUtil.convertToProtoProcedure(protoInfo1);
|
||||
final ProcedureInfo protoInfo2 = ProcedureUtil.convertToProcedureInfo(proto3);
|
||||
final ProcedureProtos.Procedure proto4 = ProcedureUtil.convertToProtoProcedure(protoInfo2);
|
||||
assertEquals("ProcedureInfo protobuf does not match", proto3, proto4);
|
||||
assertEquals("ProcedureInfo/Procedure protobuf does not match", pbproc, proto3);
|
||||
assertEquals("ProcedureInfo/Procedure protobuf does not match", pbproc, proto4);
|
||||
}
|
||||
|
||||
public static class TestProcedureNoDefaultConstructor extends TestProcedure {
|
||||
public TestProcedureNoDefaultConstructor(int x) {}
|
||||
}
|
||||
}
|
|
@ -973,10 +973,9 @@ public class MasterRpcServices extends RSRpcServices
|
|||
RpcController rpcController,
|
||||
ListProceduresRequest request) throws ServiceException {
|
||||
try {
|
||||
ListProceduresResponse.Builder response =
|
||||
ListProceduresResponse.newBuilder();
|
||||
final ListProceduresResponse.Builder response = ListProceduresResponse.newBuilder();
|
||||
for (ProcedureInfo p: master.listProcedures()) {
|
||||
response.addProcedure(ProcedureUtil.convertToProcedureProto(p));
|
||||
response.addProcedure(ProcedureUtil.convertToProtoProcedure(p));
|
||||
}
|
||||
return response.build();
|
||||
} catch (IOException e) {
|
||||
|
|
Loading…
Reference in New Issue