use std::collections::HashMap;
use std::time::SystemTime;
use dotenvy::dotenv;
use futures::{StreamExt, SinkExt};
use tokio::sync::mpsc;
use tracing::{debug, error, info};
use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
SubscribeRequestFilterBlocksMeta, SubscribeUpdate,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum Source {
Yellowstone,
Laserstream,
}
#[derive(Default, Debug)]
struct SlotTimings {
ys_recv_ms: Option<i128>,
ls_recv_ms: Option<i128>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = dotenv();
tracing_subscriber::fmt().with_env_filter("info").init();
let ys_url = std::env::var("YS_GRPC_URL").expect("YS_GRPC_URL env variable not set");
let ls_url = std::env::var("LS_GRPC_URL").expect("LS_GRPC_URL env variable not set");
let ys_api_key = std::env::var("YS_API_KEY").ok();
let ls_api_key = std::env::var("LS_API_KEY").ok();
let commitment = CommitmentLevel::Processed;
info!(?commitment, "Starting latency comparison");
// Establish both clients
let mut ys_client = GeyserGrpcClient::build_from_shared(ys_url.clone())
.expect("invalid YS url")
.x_token(ys_api_key.clone())?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.max_decoding_message_size(10 * 1024 * 1024)
.connect()
.await?;
let mut ls_client = GeyserGrpcClient::build_from_shared(ls_url.clone())
.expect("invalid LS url")
.x_token(ls_api_key.clone())?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.max_decoding_message_size(10 * 1024 * 1024)
.connect()
.await?;
let (mut ys_tx, mut ys_rx) = ys_client.subscribe().await?;
let (mut ls_tx, mut ls_rx) = ls_client.subscribe().await?;
let subscribe_request = SubscribeRequest {
blocks_meta: {
let mut m = HashMap::<String, SubscribeRequestFilterBlocksMeta>::new();
m.insert("all".to_string(), SubscribeRequestFilterBlocksMeta::default());
m
},
commitment: Some(commitment as i32),
..Default::default()
};
ys_tx.send(subscribe_request.clone()).await?;
ls_tx.send(subscribe_request).await?;
let (agg_tx, mut agg_rx) = mpsc::unbounded_channel::<(Source, u64, i128)>();
// Spawn task for Yellowstone stream
{
let agg_tx = agg_tx.clone();
tokio::spawn(async move {
while let Some(update_res) = ys_rx.next().await {
match update_res {
Ok(update) => handle_update(Source::Yellowstone, update, &agg_tx).await,
Err(e) => {
error!(target: "ys", "stream error: {:?}", e);
break;
}
}
}
});
}
// Spawn task for Laserstream stream
{
let agg_tx = agg_tx.clone();
tokio::spawn(async move {
while let Some(update_res) = ls_rx.next().await {
match update_res {
Ok(update) => handle_update(Source::Laserstream, update, &agg_tx).await,
Err(e) => {
error!(target: "ls", "stream error: {:?}", e);
break;
}
}
}
});
}
// Aggregator – collect latencies per slot and print once we have both sources
let mut timings: HashMap<u64, SlotTimings> = HashMap::new();
let mut deltas: Vec<i128> = Vec::new();
let mut count = 0;
println!("slot,ys_recv_ms,ls_recv_ms,delta_ms");
while let Some((source, slot, latency_ms)) = agg_rx.recv().await {
let entry = timings.entry(slot).or_default();
match source {
Source::Yellowstone => entry.ys_recv_ms = Some(latency_ms),
Source::Laserstream => entry.ls_recv_ms = Some(latency_ms),
}
if let (Some(ys), Some(ls)) = (entry.ys_recv_ms, entry.ls_recv_ms) {
let delta = ys - ls; // positive => YS arrived later
println!("{slot},{ys},{ls},{delta}");
deltas.push(delta);
count += 1;
if count % 100 == 0 {
print_statistics(&deltas, count);
}
timings.remove(&slot);
}
}
Ok(())
}
async fn handle_update(
source: Source,
update: SubscribeUpdate,
agg_tx: &mpsc::UnboundedSender<(Source, u64, i128)>,
) {
if let Some(UpdateOneof::BlockMeta(block_meta)) = update.update_oneof {
let slot = block_meta.slot;
let recv_ms = system_time_to_millis(SystemTime::now());
debug!(?source, slot, recv_ms, "BlockMeta received");
let _ = agg_tx.send((source, slot, recv_ms));
}
}
fn print_statistics(deltas: &[i128], count: usize) {
if deltas.is_empty() {
return;
}
let mut sorted_deltas = deltas.to_vec();
sorted_deltas.sort();
let median = if sorted_deltas.len() % 2 == 0 {
let mid = sorted_deltas.len() / 2;
(sorted_deltas[mid - 1] + sorted_deltas[mid]) / 2
} else {
sorted_deltas[sorted_deltas.len() / 2]
};
let min = *sorted_deltas.first().unwrap();
let max = *sorted_deltas.last().unwrap();
let sum: i128 = sorted_deltas.iter().sum();
let mean = sum / sorted_deltas.len() as i128;
let p25_idx = (sorted_deltas.len() as f64 * 0.25) as usize;
let p75_idx = (sorted_deltas.len() as f64 * 0.75) as usize;
let p95_idx = (sorted_deltas.len() as f64 * 0.95) as usize;
let p25 = sorted_deltas[p25_idx.min(sorted_deltas.len() - 1)];
let p75 = sorted_deltas[p75_idx.min(sorted_deltas.len() - 1)];
let p95 = sorted_deltas[p95_idx.min(sorted_deltas.len() - 1)];
eprintln!("--- Statistics after {} slots ---", count);
eprintln!("Delta (YS - LS) in milliseconds:");
eprintln!(" Min: {}, Max: {}", min, max);
eprintln!(" Mean: {}, Median: {}", mean, median);
eprintln!(" P25: {}, P75: {}, P95: {}", p25, p75, p95);
eprintln!(" Positive deltas (YS slower): {}/{} ({:.1}%)",
sorted_deltas.iter().filter(|&&x| x > 0).count(),
sorted_deltas.len(),
sorted_deltas.iter().filter(|&&x| x > 0).count() as f64 / sorted_deltas.len() as f64 * 100.0);
eprintln!("---");
}
fn system_time_to_millis(st: SystemTime) -> i128 {
st.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i128
}