527 lines
14 KiB
Go
527 lines
14 KiB
Go
// Copyright 2018, OpenCensus Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ocagent
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"google.golang.org/api/support/bundler"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
"go.opencensus.io/plugin/ocgrpc"
|
|
"go.opencensus.io/resource"
|
|
"go.opencensus.io/stats/view"
|
|
"go.opencensus.io/trace"
|
|
|
|
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
|
|
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
|
|
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
|
|
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
|
|
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
|
|
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
|
|
)
|
|
|
|
var startupMu sync.Mutex
|
|
var startTime time.Time
|
|
|
|
func init() {
|
|
startupMu.Lock()
|
|
startTime = time.Now()
|
|
startupMu.Unlock()
|
|
}
|
|
|
|
var _ trace.Exporter = (*Exporter)(nil)
|
|
var _ view.Exporter = (*Exporter)(nil)
|
|
|
|
type Exporter struct {
|
|
// mu protects the non-atomic and non-channel variables
|
|
mu sync.RWMutex
|
|
// senderMu protects the concurrent unsafe send on traceExporter client
|
|
senderMu sync.Mutex
|
|
// recvMu protects the concurrent unsafe recv on traceExporter client
|
|
recvMu sync.Mutex
|
|
started bool
|
|
stopped bool
|
|
agentAddress string
|
|
serviceName string
|
|
canDialInsecure bool
|
|
traceExporter agenttracepb.TraceService_ExportClient
|
|
metricsExporter agentmetricspb.MetricsService_ExportClient
|
|
nodeInfo *commonpb.Node
|
|
grpcClientConn *grpc.ClientConn
|
|
reconnectionPeriod time.Duration
|
|
resource *resourcepb.Resource
|
|
compressor string
|
|
headers map[string]string
|
|
lastConnectErrPtr unsafe.Pointer
|
|
|
|
startOnce sync.Once
|
|
stopCh chan bool
|
|
disconnectedCh chan bool
|
|
|
|
backgroundConnectionDoneCh chan bool
|
|
|
|
traceBundler *bundler.Bundler
|
|
|
|
// viewDataBundler is the bundler to enable conversion
|
|
// from OpenCensus-Go view.Data to metricspb.Metric.
|
|
// Please do not confuse it with metricsBundler!
|
|
viewDataBundler *bundler.Bundler
|
|
|
|
clientTransportCredentials credentials.TransportCredentials
|
|
|
|
grpcDialOptions []grpc.DialOption
|
|
}
|
|
|
|
func NewExporter(opts ...ExporterOption) (*Exporter, error) {
|
|
exp, err := NewUnstartedExporter(opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := exp.Start(); err != nil {
|
|
return nil, err
|
|
}
|
|
return exp, nil
|
|
}
|
|
|
|
const spanDataBufferSize = 300
|
|
|
|
func NewUnstartedExporter(opts ...ExporterOption) (*Exporter, error) {
|
|
e := new(Exporter)
|
|
for _, opt := range opts {
|
|
opt.withExporter(e)
|
|
}
|
|
traceBundler := bundler.NewBundler((*trace.SpanData)(nil), func(bundle interface{}) {
|
|
e.uploadTraces(bundle.([]*trace.SpanData))
|
|
})
|
|
traceBundler.DelayThreshold = 2 * time.Second
|
|
traceBundler.BundleCountThreshold = spanDataBufferSize
|
|
e.traceBundler = traceBundler
|
|
|
|
viewDataBundler := bundler.NewBundler((*view.Data)(nil), func(bundle interface{}) {
|
|
e.uploadViewData(bundle.([]*view.Data))
|
|
})
|
|
viewDataBundler.DelayThreshold = 2 * time.Second
|
|
viewDataBundler.BundleCountThreshold = 500 // TODO: (@odeke-em) make this configurable.
|
|
e.viewDataBundler = viewDataBundler
|
|
e.nodeInfo = NodeWithStartTime(e.serviceName)
|
|
e.resource = resourceProtoFromEnv()
|
|
|
|
return e, nil
|
|
}
|
|
|
|
const (
|
|
maxInitialConfigRetries = 10
|
|
maxInitialTracesRetries = 10
|
|
)
|
|
|
|
var (
|
|
errAlreadyStarted = errors.New("already started")
|
|
errNotStarted = errors.New("not started")
|
|
errStopped = errors.New("stopped")
|
|
)
|
|
|
|
// Start dials to the agent, establishing a connection to it. It also
|
|
// initiates the Config and Trace services by sending over the initial
|
|
// messages that consist of the node identifier. Start invokes a background
|
|
// connector that will reattempt connections to the agent periodically
|
|
// if the connection dies.
|
|
func (ae *Exporter) Start() error {
|
|
var err = errAlreadyStarted
|
|
ae.startOnce.Do(func() {
|
|
ae.mu.Lock()
|
|
ae.started = true
|
|
ae.disconnectedCh = make(chan bool, 1)
|
|
ae.stopCh = make(chan bool)
|
|
ae.backgroundConnectionDoneCh = make(chan bool)
|
|
ae.mu.Unlock()
|
|
|
|
// An optimistic first connection attempt to ensure that
|
|
// applications under heavy load can immediately process
|
|
// data. See https://github.com/census-ecosystem/opencensus-go-exporter-ocagent/pull/63
|
|
if err := ae.connect(); err == nil {
|
|
ae.setStateConnected()
|
|
} else {
|
|
ae.setStateDisconnected(err)
|
|
}
|
|
go ae.indefiniteBackgroundConnection()
|
|
|
|
err = nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
func (ae *Exporter) prepareAgentAddress() string {
|
|
if ae.agentAddress != "" {
|
|
return ae.agentAddress
|
|
}
|
|
return fmt.Sprintf("%s:%d", DefaultAgentHost, DefaultAgentPort)
|
|
}
|
|
|
|
func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error {
|
|
ae.mu.RLock()
|
|
started := ae.started
|
|
nodeInfo := ae.nodeInfo
|
|
ae.mu.RUnlock()
|
|
|
|
if !started {
|
|
return errNotStarted
|
|
}
|
|
|
|
ae.mu.Lock()
|
|
// If the previous clientConn was non-nil, close it
|
|
if ae.grpcClientConn != nil {
|
|
_ = ae.grpcClientConn.Close()
|
|
}
|
|
ae.grpcClientConn = cc
|
|
ae.mu.Unlock()
|
|
|
|
if err := ae.createTraceServiceConnection(ae.grpcClientConn, nodeInfo); err != nil {
|
|
return err
|
|
}
|
|
|
|
return ae.createMetricsServiceConnection(ae.grpcClientConn, nodeInfo)
|
|
}
|
|
|
|
func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
|
|
// Initiate the trace service by sending over node identifier info.
|
|
traceSvcClient := agenttracepb.NewTraceServiceClient(cc)
|
|
ctx := context.Background()
|
|
if len(ae.headers) > 0 {
|
|
ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
|
|
}
|
|
traceExporter, err := traceSvcClient.Export(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Exporter.Start:: TraceServiceClient: %v", err)
|
|
}
|
|
|
|
firstTraceMessage := &agenttracepb.ExportTraceServiceRequest{
|
|
Node: node,
|
|
Resource: ae.resource,
|
|
}
|
|
if err := traceExporter.Send(firstTraceMessage); err != nil {
|
|
return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err)
|
|
}
|
|
|
|
ae.mu.Lock()
|
|
ae.traceExporter = traceExporter
|
|
ae.mu.Unlock()
|
|
|
|
// Initiate the config service by sending over node identifier info.
|
|
configStream, err := traceSvcClient.Config(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("Exporter.Start:: ConfigStream: %v", err)
|
|
}
|
|
firstCfgMessage := &agenttracepb.CurrentLibraryConfig{Node: node}
|
|
if err := configStream.Send(firstCfgMessage); err != nil {
|
|
return fmt.Errorf("Exporter.Start:: Failed to initiate the Config service: %v", err)
|
|
}
|
|
|
|
// In the background, handle trace configurations that are beamed down
|
|
// by the agent, but also reply to it with the applied configuration.
|
|
go ae.handleConfigStreaming(configStream)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ae *Exporter) createMetricsServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
|
|
metricsSvcClient := agentmetricspb.NewMetricsServiceClient(cc)
|
|
metricsExporter, err := metricsSvcClient.Export(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("MetricsExporter: failed to start the service client: %v", err)
|
|
}
|
|
// Initiate the metrics service by sending over the first message just containing the Node and Resource.
|
|
firstMetricsMessage := &agentmetricspb.ExportMetricsServiceRequest{
|
|
Node: node,
|
|
Resource: ae.resource,
|
|
}
|
|
if err := metricsExporter.Send(firstMetricsMessage); err != nil {
|
|
return fmt.Errorf("MetricsExporter:: failed to send the first message: %v", err)
|
|
}
|
|
|
|
ae.mu.Lock()
|
|
ae.metricsExporter = metricsExporter
|
|
ae.mu.Unlock()
|
|
|
|
// With that we are good to go and can start sending metrics
|
|
return nil
|
|
}
|
|
|
|
func (ae *Exporter) dialToAgent() (*grpc.ClientConn, error) {
|
|
addr := ae.prepareAgentAddress()
|
|
var dialOpts []grpc.DialOption
|
|
if ae.clientTransportCredentials != nil {
|
|
dialOpts = append(dialOpts, grpc.WithTransportCredentials(ae.clientTransportCredentials))
|
|
} else if ae.canDialInsecure {
|
|
dialOpts = append(dialOpts, grpc.WithInsecure())
|
|
}
|
|
if ae.compressor != "" {
|
|
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(ae.compressor)))
|
|
}
|
|
dialOpts = append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
|
|
if len(ae.grpcDialOptions) != 0 {
|
|
dialOpts = append(dialOpts, ae.grpcDialOptions...)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
if len(ae.headers) > 0 {
|
|
ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
|
|
}
|
|
return grpc.DialContext(ctx, addr, dialOpts...)
|
|
}
|
|
|
|
func (ae *Exporter) handleConfigStreaming(configStream agenttracepb.TraceService_ConfigClient) error {
|
|
// Note: We haven't yet implemented configuration sending so we
|
|
// should NOT be changing connection states within this function for now.
|
|
for {
|
|
recv, err := configStream.Recv()
|
|
if err != nil {
|
|
// TODO: Check if this is a transient error or exponential backoff-able.
|
|
return err
|
|
}
|
|
cfg := recv.Config
|
|
if cfg == nil {
|
|
continue
|
|
}
|
|
|
|
// Otherwise now apply the trace configuration sent down from the agent
|
|
if psamp := cfg.GetProbabilitySampler(); psamp != nil {
|
|
trace.ApplyConfig(trace.Config{DefaultSampler: trace.ProbabilitySampler(psamp.SamplingProbability)})
|
|
} else if csamp := cfg.GetConstantSampler(); csamp != nil {
|
|
alwaysSample := csamp.Decision == tracepb.ConstantSampler_ALWAYS_ON
|
|
if alwaysSample {
|
|
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
|
|
} else {
|
|
trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()})
|
|
}
|
|
} else { // TODO: Add the rate limiting sampler here
|
|
}
|
|
|
|
// Then finally send back to upstream the newly applied configuration
|
|
err = configStream.Send(&agenttracepb.CurrentLibraryConfig{Config: &tracepb.TraceConfig{Sampler: cfg.Sampler}})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop shuts down all the connections and resources
|
|
// related to the exporter.
|
|
func (ae *Exporter) Stop() error {
|
|
ae.mu.RLock()
|
|
cc := ae.grpcClientConn
|
|
started := ae.started
|
|
stopped := ae.stopped
|
|
ae.mu.RUnlock()
|
|
|
|
if !started {
|
|
return errNotStarted
|
|
}
|
|
if stopped {
|
|
// TODO: tell the user that we've already stopped, so perhaps a sentinel error?
|
|
return nil
|
|
}
|
|
|
|
ae.Flush()
|
|
|
|
// Now close the underlying gRPC connection.
|
|
var err error
|
|
if cc != nil {
|
|
err = cc.Close()
|
|
}
|
|
|
|
// At this point we can change the state variables: started and stopped
|
|
ae.mu.Lock()
|
|
ae.started = false
|
|
ae.stopped = true
|
|
ae.mu.Unlock()
|
|
close(ae.stopCh)
|
|
|
|
// Ensure that the backgroundConnector returns
|
|
<-ae.backgroundConnectionDoneCh
|
|
|
|
return err
|
|
}
|
|
|
|
func (ae *Exporter) ExportSpan(sd *trace.SpanData) {
|
|
if sd == nil {
|
|
return
|
|
}
|
|
_ = ae.traceBundler.Add(sd, 1)
|
|
}
|
|
|
|
func (ae *Exporter) ExportTraceServiceRequest(batch *agenttracepb.ExportTraceServiceRequest) error {
|
|
if batch == nil || len(batch.Spans) == 0 {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-ae.stopCh:
|
|
return errStopped
|
|
|
|
default:
|
|
if lastConnectErr := ae.lastConnectError(); lastConnectErr != nil {
|
|
return fmt.Errorf("ExportTraceServiceRequest: no active connection, last connection error: %v", lastConnectErr)
|
|
}
|
|
|
|
ae.senderMu.Lock()
|
|
err := ae.traceExporter.Send(batch)
|
|
ae.senderMu.Unlock()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
ae.recvMu.Lock()
|
|
// Perform a .Recv to try to find out why the RPC actually ended.
|
|
// See:
|
|
// * https://github.com/grpc/grpc-go/blob/d389f9fac68eea0dcc49957d0b4cca5b3a0a7171/stream.go#L98-L100
|
|
// * https://groups.google.com/forum/#!msg/grpc-io/XcN4hA9HonI/F_UDiejTAwAJ
|
|
for {
|
|
_, err = ae.traceExporter.Recv()
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
ae.recvMu.Unlock()
|
|
}
|
|
|
|
ae.setStateDisconnected(err)
|
|
if err != io.EOF {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (ae *Exporter) ExportView(vd *view.Data) {
|
|
if vd == nil {
|
|
return
|
|
}
|
|
_ = ae.viewDataBundler.Add(vd, 1)
|
|
}
|
|
|
|
func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span {
|
|
if len(sdl) == 0 {
|
|
return nil
|
|
}
|
|
protoSpans := make([]*tracepb.Span, 0, len(sdl))
|
|
for _, sd := range sdl {
|
|
if sd != nil {
|
|
protoSpans = append(protoSpans, ocSpanToProtoSpan(sd))
|
|
}
|
|
}
|
|
return protoSpans
|
|
}
|
|
|
|
func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) {
|
|
select {
|
|
case <-ae.stopCh:
|
|
return
|
|
|
|
default:
|
|
if !ae.connected() {
|
|
return
|
|
}
|
|
|
|
protoSpans := ocSpanDataToPbSpans(sdl)
|
|
if len(protoSpans) == 0 {
|
|
return
|
|
}
|
|
ae.senderMu.Lock()
|
|
err := ae.traceExporter.Send(&agenttracepb.ExportTraceServiceRequest{
|
|
Spans: protoSpans,
|
|
})
|
|
ae.senderMu.Unlock()
|
|
if err != nil {
|
|
ae.setStateDisconnected(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func ocViewDataToPbMetrics(vdl []*view.Data) []*metricspb.Metric {
|
|
if len(vdl) == 0 {
|
|
return nil
|
|
}
|
|
metrics := make([]*metricspb.Metric, 0, len(vdl))
|
|
for _, vd := range vdl {
|
|
if vd != nil {
|
|
vmetric, err := viewDataToMetric(vd)
|
|
// TODO: (@odeke-em) somehow report this error, if it is non-nil.
|
|
if err == nil && vmetric != nil {
|
|
metrics = append(metrics, vmetric)
|
|
}
|
|
}
|
|
}
|
|
return metrics
|
|
}
|
|
|
|
func (ae *Exporter) uploadViewData(vdl []*view.Data) {
|
|
select {
|
|
case <-ae.stopCh:
|
|
return
|
|
|
|
default:
|
|
if !ae.connected() {
|
|
return
|
|
}
|
|
|
|
protoMetrics := ocViewDataToPbMetrics(vdl)
|
|
if len(protoMetrics) == 0 {
|
|
return
|
|
}
|
|
err := ae.metricsExporter.Send(&agentmetricspb.ExportMetricsServiceRequest{
|
|
Metrics: protoMetrics,
|
|
// TODO:(@odeke-em)
|
|
// a) Figure out how to derive a Node from the environment
|
|
// b) Figure out how to derive a Resource from the environment
|
|
// or better letting users of the exporter configure it.
|
|
})
|
|
if err != nil {
|
|
ae.setStateDisconnected(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ae *Exporter) Flush() {
|
|
ae.traceBundler.Flush()
|
|
ae.viewDataBundler.Flush()
|
|
}
|
|
|
|
func resourceProtoFromEnv() *resourcepb.Resource {
|
|
rs, _ := resource.FromEnv(context.Background())
|
|
if rs == nil {
|
|
return nil
|
|
}
|
|
|
|
rprs := &resourcepb.Resource{
|
|
Type: rs.Type,
|
|
}
|
|
if rs.Labels != nil {
|
|
rprs.Labels = make(map[string]string)
|
|
for k, v := range rs.Labels {
|
|
rprs.Labels[k] = v
|
|
}
|
|
}
|
|
return rprs
|
|
}
|