HDDS-1154. Add tracing to the client side of StorageContainerLocationProtocol and OzoneManagerProtocol.

Contributed by Elek, Marton.
This commit is contained in:
Anu Engineer 2019-02-24 13:13:37 -08:00
parent 021f8deef0
commit 8387bbdfb5
13 changed files with 90 additions and 73 deletions

View File

@ -23,8 +23,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerDataProto;
@ -48,12 +47,12 @@ public class ContainerOperationClient implements ScmClient {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerOperationClient.class);
private static long containerSizeB = -1;
private final StorageContainerLocationProtocolClientSideTranslatorPB
private final StorageContainerLocationProtocol
storageContainerLocationClient;
private final XceiverClientManager xceiverClientManager;
public ContainerOperationClient(
StorageContainerLocationProtocolClientSideTranslatorPB
StorageContainerLocationProtocol
storageContainerLocationClient,
XceiverClientManager xceiverClientManager) {
this.storageContainerLocationClient = storageContainerLocationClient;

View File

@ -18,14 +18,6 @@
package org.apache.hadoop.hdds.scm;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
@ -34,6 +26,13 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import com.google.common.annotations.VisibleForTesting;
/**
* A Client for the storageContainer protocol.
*/

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.security.KerberosInfo;
@ -35,7 +36,7 @@
* that currently host a container.
*/
@KerberosInfo(serverPrincipal = ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
public interface StorageContainerLocationProtocol {
public interface StorageContainerLocationProtocol extends Closeable {
/**
* Asks SCM where a container should be allocated. SCM responds with the
* set of datanodes that should be used creating this container.

View File

@ -552,4 +552,9 @@ public AuditMessage buildAuditMessageForFailure(AuditAction op, Map<String,
.withException(throwable)
.build();
}
@Override
public void close() throws IOException {
stop();
}
}

View File

@ -37,9 +37,11 @@
import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@ -134,12 +136,14 @@ public ScmClient createScmClient()
RPC.setProtocolEngine(ozoneConf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
StorageContainerLocationProtocolClientSideTranslatorPB client =
StorageContainerLocationProtocol client =
TracingUtil.createProxy(
new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
scmAddress, UserGroupInformation.getCurrentUser(), ozoneConf,
NetUtils.getDefaultSocketFactory(ozoneConf),
Client.getRpcTimeout(ozoneConf)));
Client.getRpcTimeout(ozoneConf))),
StorageContainerLocationProtocol.class);
return new ContainerOperationClient(
client, new XceiverClientManager(ozoneConf));
}

View File

@ -25,11 +25,11 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.security.UserGroupInformation;
@ -263,7 +263,7 @@ public boolean seekToNewSource(long targetPos) throws IOException {
public static LengthInputStream getFromOmKeyInfo(
OmKeyInfo keyInfo,
XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB
StorageContainerLocationProtocol
storageContainerLocationClient,
String requestId, boolean verifyChecksum) throws IOException {
long length = 0;

View File

@ -26,16 +26,15 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.helpers.*;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.RaftRetryFailureException;
@ -68,9 +67,8 @@ public class KeyOutputStream extends OutputStream {
// array list's get(index) is O(1)
private final ArrayList<BlockOutputStreamEntry> streamEntries;
private int currentStreamIndex;
private final OzoneManagerProtocolClientSideTranslatorPB omClient;
private final
StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
private final OzoneManagerProtocol omClient;
private final StorageContainerLocationProtocol scmClient;
private final OmKeyArgs keyArgs;
private final long openID;
private final XceiverClientManager xceiverClientManager;
@ -144,8 +142,8 @@ public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
@SuppressWarnings("parameternumber")
public KeyOutputStream(OpenKeySession handler,
XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
StorageContainerLocationProtocol scmClient,
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationFactor factor, ReplicationType type,
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
ChecksumType checksumType, int bytesPerChecksum,
@ -572,8 +570,8 @@ public FileEncryptionInfo getFileEncryptionInfo() {
public static class Builder {
private OpenKeySession openHandler;
private XceiverClientManager xceiverManager;
private StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
private OzoneManagerProtocolClientSideTranslatorPB omClient;
private StorageContainerLocationProtocol scmClient;
private OzoneManagerProtocol omClient;
private int chunkSize;
private String requestID;
private ReplicationType type;
@ -609,14 +607,13 @@ public Builder setXceiverClientManager(XceiverClientManager manager) {
return this;
}
public Builder setScmClient(
StorageContainerLocationProtocolClientSideTranslatorPB client) {
public Builder setScmClient(StorageContainerLocationProtocol client) {
this.scmClient = client;
return this;
}
public Builder setOmClient(
OzoneManagerProtocolClientSideTranslatorPB client) {
OzoneManagerProtocol client) {
this.omClient = client;
return this;
}

View File

@ -31,6 +31,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -63,7 +65,7 @@
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneAcl;
@ -103,11 +105,10 @@ public class RpcClient implements ClientProtocol {
LoggerFactory.getLogger(RpcClient.class);
private final OzoneConfiguration conf;
private final StorageContainerLocationProtocolClientSideTranslatorPB
private final StorageContainerLocationProtocol
storageContainerLocationClient;
private final OMProxyProvider omProxyProvider;
private final OzoneManagerProtocolClientSideTranslatorPB
ozoneManagerClient;
private final OzoneManagerProtocol ozoneManagerClient;
private final XceiverClientManager xceiverClientManager;
private final int chunkSize;
private final ChecksumType checksumType;
@ -137,19 +138,24 @@ public RpcClient(Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
ProtobufRpcEngine.class);
this.omProxyProvider = new OMProxyProvider(conf, ugi);
this.ozoneManagerClient = this.omProxyProvider.getProxy();
this.ozoneManagerClient =
TracingUtil.createProxy(
this.omProxyProvider.getProxy(),
OzoneManagerProtocol.class);
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
InetSocketAddress scmAddress = getScmAddressForClient();
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
this.storageContainerLocationClient =
StorageContainerLocationProtocolClientSideTranslatorPB client =
new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
scmAddress, ugi, conf, NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
this.storageContainerLocationClient =
TracingUtil.createProxy(client, StorageContainerLocationProtocol.class);
this.xceiverClientManager = new XceiverClientManager(conf);
int configuredChunkSize = (int) conf

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.security.KerberosInfo;
@ -43,7 +44,8 @@
*/
@KerberosInfo(
serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
public interface OzoneManagerProtocol extends OzoneManagerSecurityProtocol {
public interface OzoneManagerProtocol
extends OzoneManagerSecurityProtocol, Closeable {
/**
* Creates a volume.

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone.om.protocolPB;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -112,8 +111,6 @@
import com.google.common.base.Strings;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED;
@ -127,7 +124,7 @@
@InterfaceAudience.Private
public final class OzoneManagerProtocolClientSideTranslatorPB
implements OzoneManagerProtocol, ProtocolTranslator, Closeable {
implements OzoneManagerProtocol, ProtocolTranslator {
/**
* RpcController is not used and hence is set to null.
@ -194,9 +191,6 @@ private OMRequest.Builder createOMRequest(Type cmdType) {
*/
private OMResponse submitRequest(OMRequest omRequest)
throws IOException {
Scope scope =
GlobalTracer.get().buildSpan(omRequest.getCmdType().name())
.startActive(true);
try {
OMRequest payload = OMRequest.newBuilder(omRequest)
.setTraceID(TracingUtil.exportCurrentSpan())
@ -204,8 +198,6 @@ private OMResponse submitRequest(OMRequest omRequest)
return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
} finally {
scope.close();
}
}

View File

@ -20,15 +20,18 @@
import com.sun.jersey.api.core.ApplicationAdapter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.web.ObjectStoreApplication;
@ -66,9 +69,8 @@ public final class ObjectStoreHandler implements Closeable {
LoggerFactory.getLogger(ObjectStoreHandler.class);
private final ObjectStoreJerseyContainer objectStoreJerseyContainer;
private final OzoneManagerProtocolClientSideTranslatorPB
ozoneManagerClient;
private final StorageContainerLocationProtocolClientSideTranslatorPB
private final OzoneManagerProtocol ozoneManagerClient;
private final StorageContainerLocationProtocol
storageContainerLocationClient;
private final ScmBlockLocationProtocolClientSideTranslatorPB
scmBlockLocationClient;
@ -94,11 +96,14 @@ public ObjectStoreHandler(Configuration conf) throws IOException {
InetSocketAddress scmAddress =
getScmAddressForClients(conf);
this.storageContainerLocationClient =
new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
scmAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
TracingUtil.createProxy(
new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class,
scmVersion,
scmAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf))),
StorageContainerLocationProtocol.class);
InetSocketAddress scmBlockAddress =
getScmAddressForBlockClients(conf);
@ -115,15 +120,18 @@ public ObjectStoreHandler(Configuration conf) throws IOException {
RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
InetSocketAddress omAddress = getOmAddress(conf);
this.ozoneManagerClient =
new OzoneManagerProtocolClientSideTranslatorPB(
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
omAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)), clientId.toString());
TracingUtil.createProxy(
new OzoneManagerProtocolClientSideTranslatorPB(
RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
omAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)), clientId.toString()),
OzoneManagerProtocol.class);
storageHandler = new DistributedStorageHandler(
new OzoneConfiguration(conf),
this.storageContainerLocationClient,
TracingUtil.createProxy(storageContainerLocationClient,
StorageContainerLocationProtocol.class),
this.ozoneManagerClient);
ApplicationAdapter aa =
new ApplicationAdapter(new ObjectStoreApplication());

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@ -35,7 +36,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
@ -47,8 +48,6 @@
import org.apache.hadoop.ozone.web.request.OzoneQuota;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
@ -74,9 +73,9 @@ public final class DistributedStorageHandler implements StorageHandler {
private static final Logger LOG =
LoggerFactory.getLogger(DistributedStorageHandler.class);
private final StorageContainerLocationProtocolClientSideTranslatorPB
private final StorageContainerLocationProtocol
storageContainerLocationClient;
private final OzoneManagerProtocolClientSideTranslatorPB
private final OzoneManagerProtocol
ozoneManagerClient;
private final XceiverClientManager xceiverClientManager;
private final OzoneAcl.OzoneACLRights userRights;
@ -98,10 +97,8 @@ public final class DistributedStorageHandler implements StorageHandler {
* @param ozoneManagerClient OzoneManager proxy
*/
public DistributedStorageHandler(OzoneConfiguration conf,
StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocation,
OzoneManagerProtocolClientSideTranslatorPB
ozoneManagerClient) {
StorageContainerLocationProtocol storageContainerLocation,
OzoneManagerProtocol ozoneManagerClient) {
this.ozoneManagerClient = ozoneManagerClient;
this.storageContainerLocationClient = storageContainerLocation;
this.xceiverClientManager = new XceiverClientManager(conf);

View File

@ -512,6 +512,11 @@ private IAccessAuthorizer getACLAuthorizerInstance(OzoneConfiguration conf) {
return ReflectionUtils.newInstance(clazz, conf);
}
@Override
public void close() throws IOException {
stop();
}
/**
* Class which schedule saving metrics to a file.
*/
@ -736,12 +741,14 @@ private static StorageContainerLocationProtocol getScmContainerClient(
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
InetSocketAddress scmAddr = getScmAddressForClients(
conf);
StorageContainerLocationProtocolClientSideTranslatorPB scmContainerClient =
new StorageContainerLocationProtocolClientSideTranslatorPB(
StorageContainerLocationProtocol scmContainerClient =
TracingUtil.createProxy(
new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
scmAddr, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
Client.getRpcTimeout(conf))),
StorageContainerLocationProtocol.class);
return scmContainerClient;
}