-
Notifications
You must be signed in to change notification settings - Fork 7
/
zipkin.rs
104 lines (90 loc) · 3.16 KB
/
zipkin.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use anyhow::{Context, Result};
use log::warn;
use serde_json::json;
use std::time::Duration;
use ureq;
use crate::{Event, TraceEvent};
use super::{
zipkin_formatter::{LocalEndpoint, Span, ZipkinFormatter},
Adapter, AdapterHandle,
};
/// An adapter to send events from your module to a [Zipkin Instance](https://zipkin.io/).
pub struct ZipkinAdapter {}
impl Adapter for ZipkinAdapter {
fn handle_trace_event(&mut self, trace_evt: TraceEvent) -> Result<()> {
let mut spans = vec![];
let trace_id = trace_evt.telemetry_id.to_hex_16();
for span in trace_evt.events {
self.event_to_spans(&mut spans, span, None, trace_id.clone())?;
}
self.dump_trace(spans)?;
Ok(())
}
}
impl ZipkinAdapter {
/// Creates the Zipkin adapter and spawns a task for it.
/// This should ideally be created once per process of
/// your rust application.
pub fn create() -> AdapterHandle {
Self::spawn(Self {})
}
fn event_to_spans(
&self,
spans: &mut Vec<Span>,
event: Event,
parent_id: Option<String>,
trace_id: String,
) -> Result<()> {
match event {
Event::Func(f) => {
let name = f.name.clone().unwrap_or("unknown-name".to_string());
let span = Span::new(trace_id.clone(), parent_id, name, f.start, f.end);
let span_id = Some(span.id.clone());
spans.push(span);
for e in f.within.iter() {
self.event_to_spans(spans, e.to_owned(), span_id.clone(), trace_id.clone())?;
}
}
Event::Alloc(a) => {
if let Some(span) = spans.last_mut() {
let key = "amount".to_string();
let mut amount = a.amount;
if let Some(alloc) = span.tags.get(key.as_str()) {
if let Ok(v) = alloc.parse::<u32>() {
amount = v + a.amount;
}
}
span.add_tag_i64(key, amount.into());
}
}
_ => {}
}
Ok(())
}
fn dump_trace(&mut self, spans: Vec<Span>) -> Result<()> {
let mut ztf = ZipkinFormatter::new();
ztf.spans = spans;
let first_span = ztf
.spans
.first_mut()
.context("No spans to send to zipkin")?;
first_span.local_endpoint = Some(LocalEndpoint {
service_name: Some("my_service".into()),
});
let url = "http://localhost:9411/api/v2/spans";
let j = json!(&ztf.spans);
let body = serde_json::to_string(&j)?;
// perhaps this should be an async operation with something
// like reqwest?
let response = ureq::post(url)
.timeout(Duration::from_secs(1))
.set("Content-Type", "application/json")
.send_string(&body);
// should maybe retry or throw an exception
// TODO check default logic of http client
if response.is_ok() {
warn!("Request to Zipkin failed: {:#?}", response);
}
Ok(())
}
}