API Authentication - Rust examples

Below are complete examples for authenticating with the Data Streams API in Rust. Each example shows how to properly generate the required headers and make a request.

To learn more about the Data Streams API authentication, see the Data Streams Authentication page.

Note: The Data Streams SDKs handle authentication automatically. If you're using the Go SDK or Rust SDK, you don't need to implement the authentication logic manually.

API Authentication Example

Requirements

  • Rust (v1.70 or later recommended)
  • API credentials from Chainlink Data Streams

Running the Example

  1. Create a Cargo.toml file:

    [package]
    name = "chainlink-streams-direct-auth"
    version = "0.1.0"
    edition = "2021"
    
    [dependencies]
    reqwest = { version = "0.11", features = ["json", "blocking"] }
    serde = { version = "1.0", features = ["derive"] }
    serde_json = "1.0"
    tokio = { version = "1.32", features = ["full"] }
    hmac = "0.12"
    sha2 = "0.10"
    hex = "0.4"
    chrono = "0.4"
    
  2. Create a src/main.rs file:

    use hmac::{Hmac, Mac};
    use reqwest::header::{HeaderMap, HeaderValue};
    use serde::{Deserialize, Serialize};
    use sha2::{Digest, Sha256};
    use std::env;
    use std::error::Error;
    use std::time::{SystemTime, UNIX_EPOCH};
    
    type HmacSha256 = Hmac<Sha256>;
    
    // SingleReport represents a data feed report structure
    #[derive(Debug, Deserialize, Serialize)]
    struct SingleReport {
        #[serde(rename = "feedID")]
        feed_id: String,
        #[serde(rename = "validFromTimestamp")]
        valid_from_timestamp: u32,
        #[serde(rename = "observationsTimestamp")]
        observations_timestamp: u32,
        #[serde(rename = "fullReport")]
        full_report: String,
    }
    
    // SingleReportResponse is the response structure for a single report
    #[derive(Debug, Deserialize, Serialize)]
    struct SingleReportResponse {
        report: SingleReport,
    }
    
    // Generate HMAC signature for API authentication
    fn generate_hmac(
        method: &str,
        path: &str,
        body: &[u8],
        api_key: &str,
        api_secret: &str
    ) -> Result<(String, u128), Box<dyn Error>> {
        // Generate timestamp (milliseconds since Unix epoch)
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("Time went backwards")
            .as_millis();
    
        // Generate body hash
        let mut hasher = Sha256::new();
        hasher.update(body);
        let body_hash = hex::encode(hasher.finalize());
    
        // Create string to sign
        let string_to_sign = format!("{} {} {} {} {}", method, path, body_hash, api_key, timestamp);
    
        // Generate HMAC-SHA256 signature
        let mut mac = HmacSha256::new_from_slice(api_secret.as_bytes())?;
        mac.update(string_to_sign.as_bytes());
        let signature = hex::encode(mac.finalize().into_bytes());
    
        Ok((signature, timestamp))
    }
    
    // Generate authentication headers for API requests
    fn generate_auth_headers(
        method: &str,
        path: &str,
        api_key: &str,
        api_secret: &str
    ) -> Result<HeaderMap, Box<dyn Error>> {
        let (signature, timestamp) = generate_hmac(method, path, &[], api_key, api_secret)?;
    
        let mut headers = HeaderMap::new();
        headers.insert("Authorization", HeaderValue::from_str(api_key)?);
        headers.insert(
            "X-Authorization-Timestamp",
            HeaderValue::from_str(&timestamp.to_string())?
        );
        headers.insert(
            "X-Authorization-Signature-SHA256",
            HeaderValue::from_str(&signature)?
        );
    
        Ok(headers)
    }
    
    // Fetch a single report for a specific feed
    async fn fetch_single_report(feed_id: &str) -> Result<SingleReport, Box<dyn Error>> {
        // Get API credentials from environment variables
        let api_key = env::var("STREAMS_API_KEY")
            .map_err(|_| "API credentials not set. Please set STREAMS_API_KEY environment variable")?;
        let api_secret = env::var("STREAMS_API_SECRET")
            .map_err(|_| "API credentials not set. Please set STREAMS_API_SECRET environment variable")?;
    
        // API connection details
        let method = "GET";
        let host = "api.testnet-dataengine.chain.link";
        let path = "/api/v1/reports/latest";
        let full_path = format!("{}?feedID={}", path, feed_id);
    
        // Create headers with authentication
        let headers = generate_auth_headers(method, &full_path, &api_key, &api_secret)?;
    
        // Create and execute the request
        let url = format!("https://{}{}", host, full_path);
        let client = reqwest::Client::new();
        let response = client
            .get(&url)
            .headers(headers)
            .send()
            .await?;
    
        // Check for non-success status code
        if !response.status().is_success() {
            let status = response.status();
            let error_text = response.text().await?;
            return Err(format!("API error (status code {}): {}", status, error_text).into());
        }
    
        // Parse the response
        let report_resp: SingleReportResponse = response.json().await?;
        Ok(report_resp.report)
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
        // Example feed ID (ETH/USD)
        let feed_id = "0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782";
    
        println!("Fetching latest report for feed ID: {}", feed_id);
    
        // Fetch the report
        let report = fetch_single_report(feed_id).await?;
    
        // Display the report
        println!("Successfully retrieved report:");
        println!("  Feed ID: {}", report.feed_id);
        println!("  Valid From: {}", report.valid_from_timestamp);
        println!("  Observations Timestamp: {}", report.observations_timestamp);
    
        // Display the full report with truncation for readability
        let report_preview = if report.full_report.len() > 40 {
            format!("{}...", &report.full_report[..40])
        } else {
            report.full_report.clone()
        };
        println!("  Full Report: {}", report_preview);
    
        Ok(())
    }
    
  3. Set your API credentials as environment variables:

    export STREAMS_API_KEY="your-api-key"
    export STREAMS_API_SECRET="your-api-secret"
    
  4. Run with cargo run

Expected output:

Fetching latest report for feed ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
Successfully retrieved report:
  Feed ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
  Valid From: 1747933113
  Observations Timestamp: 1747933113
  Full Report: 0x00090d9e8d96765a0c49e03a6ae05c82e8f8de...

Production Considerations

While this example demonstrates the authentication mechanism, production applications should consider:

  • Connection resilience: Implement retry logic with exponential backoff for network failures
  • Error handling: Use custom error types instead of string errors for better error management
  • Logging: Replace println! with structured logging (e.g., tracing, env_logger)
  • Configuration: Make API endpoints configurable through environment variables
  • Resource management: Implement graceful shutdown for long-running connections
  • Testing: Add unit tests for HMAC generation and integration tests for API calls

For production use, consider using the Rust SDK which handles authentication automatically and provides built-in fault tolerance.

WebSocket Authentication Example

Requirements

  • Rust (v1.70 or later recommended)
  • API credentials from Chainlink Data Streams

Running the Example

  1. Create a Cargo.toml file:

    [package]
    name = "chainlink-streams-direct-auth"
    version = "0.1.0"
    edition = "2021"
    
    [dependencies]
    tokio = { version = "1.32", features = ["full"] }
    tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
    futures-util = "0.3"
    hmac = "0.12"
    sha2 = "0.10"
    hex = "0.4"
    chrono = "0.4"
    url = "2.4"
    serde = { version = "1.0", features = ["derive"] }
    serde_json = "1.0"
    
  2. Create a src/main.rs file:

    use hmac::{ Hmac, Mac };
    use sha2::{ Digest, Sha256 };
    use std::{ env, error::Error, time::{ SystemTime, UNIX_EPOCH } };
    use tokio_tungstenite::{
        connect_async,
        tungstenite::client::IntoClientRequest,
        tungstenite::protocol::Message,
    };
    use futures_util::{ StreamExt, SinkExt };
    use serde::{ Deserialize, Serialize };
    
    type HmacSha256 = Hmac<Sha256>;
    
    // Report structure for deserializing WebSocket messages
    #[derive(Debug, Deserialize, Serialize)]
    struct ReportWrapper {
        report: Report,
    }
    
    #[derive(Debug, Deserialize, Serialize)]
    struct Report {
        #[serde(rename = "feedID")]
        feed_id: String,
    
        #[serde(rename = "fullReport")]
        full_report: String,
    
        #[serde(rename = "validFromTimestamp")]
        valid_from_timestamp: u64,
    
        #[serde(rename = "observationsTimestamp")]
        observations_timestamp: u64,
    }
    
    // Generate HMAC signature for API authentication
    fn generate_hmac(
        method: &str,
        path: &str,
        body: &[u8],
        api_key: &str,
        api_secret: &str
    ) -> Result<(String, u128), Box<dyn Error>> {
        // Generate timestamp (milliseconds since Unix epoch)
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("Time went backwards")
            .as_millis();
    
        // Generate body hash
        let mut hasher = Sha256::new();
        hasher.update(body);
        let body_hash = hex::encode(hasher.finalize());
    
        // Create string to sign
        let string_to_sign = format!("{} {} {} {} {}", method, path, body_hash, api_key, timestamp);
    
        // Generate HMAC-SHA256 signature
        let mut mac = HmacSha256::new_from_slice(api_secret.as_bytes())?;
        mac.update(string_to_sign.as_bytes());
        let signature = hex::encode(mac.finalize().into_bytes());
    
        Ok((signature, timestamp))
    }
    
    // Set up WebSocket connection with proper authentication
    async fn setup_websocket_connection(
        api_key: &str,
        api_secret: &str,
        feed_ids: &[&str]
    ) -> Result<
        tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
        Box<dyn Error>
    > {
        // Validate feed IDs
        if feed_ids.is_empty() {
            return Err("No feed ID(s) provided".into());
        }
    
        // WebSocket connection details
        let host = "ws.testnet-dataengine.chain.link";
        let path = "/api/v1/ws";
        let feed_ids_joined = feed_ids.join(",");
        let full_path = format!("{}?feedIDs={}", path, feed_ids_joined);
    
        // Generate authentication signature and timestamp
        let (signature, timestamp) = generate_hmac("GET", &full_path, &[], api_key, api_secret)?;
    
        // Create WebSocket URL
        let ws_url = format!("wss://{}{}", host, full_path);
        println!("Connecting to: {}", ws_url);
    
        // Create request with auth headers
        let mut request = ws_url.into_client_request()?;
        request.headers_mut().insert("Authorization", api_key.parse()?);
        request.headers_mut().insert("X-Authorization-Timestamp", timestamp.to_string().parse()?);
        request.headers_mut().insert("X-Authorization-Signature-SHA256", signature.parse()?);
    
        // Connect to WebSocket server
        let (ws_stream, _) = connect_async(request).await?;
        println!("WebSocket connection established");
    
        Ok(ws_stream)
    }
    
    // Handle incoming WebSocket messages
    async fn handle_messages(
        mut ws_stream: tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>
    ) {
        println!("Waiting for incoming messages... (press Ctrl+C to exit)");
    
        // Process messages as they arrive
        while let Some(msg) = ws_stream.next().await {
            match msg {
                Ok(msg) => {
                    match msg {
                        Message::Text(text) => {
                            // Try to parse JSON
                            if let Ok(report_wrapper) = serde_json::from_str::<ReportWrapper>(&text) {
                                let report = &report_wrapper.report;
                                println!("\nReceived new report:");
                                println!("  Feed ID: {}", report.feed_id);
                                println!("  Valid From: {}", report.valid_from_timestamp);
                                println!("  Observations Timestamp: {}", report.observations_timestamp);
    
                                // Display the full report with truncation for readability
                                let report_preview = if report.full_report.len() > 40 {
                                    format!("{}...", &report.full_report[..40])
                                } else {
                                    report.full_report.clone()
                                };
                                println!("  Full Report: {}", report_preview);
                            } else {
                                println!("Received text message: {}", text);
                            }
                        }
                        Message::Binary(data) => {
                            // Try to parse binary as JSON
                            if let Ok(text) = String::from_utf8(data.clone()) {
                                if
                                    let Ok(report_wrapper) = serde_json::from_str::<ReportWrapper>(
                                        &text
                                    )
                                {
                                    let report = &report_wrapper.report;
                                    println!("\nReceived new report:");
                                    println!("  Feed ID: {}", report.feed_id);
                                    println!("  Valid From: {}", report.valid_from_timestamp);
                                    println!(
                                        "  Observations Timestamp: {}",
                                        report.observations_timestamp
                                    );
    
                                    // Display the full report with truncation
                                    let report_preview = if report.full_report.len() > 40 {
                                        format!("{}...", &report.full_report[..40])
                                    } else {
                                        report.full_report.clone()
                                    };
                                    println!("  Full Report: {}", report_preview);
                                } else {
                                    println!("Received binary message: {} bytes", data.len());
                                }
                            } else {
                                println!("Received binary message (not UTF-8): {} bytes", data.len());
                            }
                        }
                        Message::Ping(ping_data) => {
                            println!("Received ping, sending pong response");
                            // Send a pong with the same data to keep the connection alive
                            if let Err(e) = ws_stream.send(Message::Pong(ping_data)).await {
                                eprintln!("Error sending pong: {}", e);
                            }
                        }
                        Message::Pong(_) => println!("Received pong"),
                        Message::Close(_) => {
                            println!("Received close message");
                            break;
                        }
                        Message::Frame(_) => println!("Received raw frame"),
                    }
                }
                Err(e) => {
                    eprintln!("Error receiving message: {}", e);
                    break;
                }
            }
        }
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
        // Get API credentials from environment variables
        let api_key = env
            ::var("STREAMS_API_KEY")
            .map_err(|_| "API credentials not set. Please set STREAMS_API_KEY environment variable")?;
        let api_secret = env
            ::var("STREAMS_API_SECRET")
            .map_err(
                |_| "API credentials not set. Please set STREAMS_API_SECRET environment variable"
            )?;
    
        // Example feed IDs (ETH/USD)
        let feed_ids = vec!["0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782"];
    
        // Set up WebSocket connection
        let ws_stream = setup_websocket_connection(&api_key, &api_secret, &feed_ids).await?;
    
        // Set up a task to handle WebSocket communication
        let ws_task = tokio::spawn(handle_messages(ws_stream));
    
        // Wait for user to press Ctrl+C
        tokio::signal::ctrl_c().await?;
        println!("Shutting down...");
    
        // Clean up resources
        let _ = ws_task.abort();
    
        Ok(())
    }
    
  3. Set your API credentials as environment variables:

    export STREAMS_API_KEY="your-api-key"
    export STREAMS_API_SECRET="your-api-secret"
    
  4. Run with cargo run

Expected output:

Connecting to: wss://ws.testnet-dataengine.chain.link/api/v1/ws?feedIDs=0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
WebSocket connection established
Waiting for incoming messages... (press Ctrl+C to exit)
Received ping, sending pong response

Received new report:
  Feed ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
  Valid From: 1747934358
  Observations Timestamp: 1747934358
  Full Report: 0x00090d9e8d96765a0c49e03a6ae05c82e8f8de...

Received new report:
  Feed ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
  Valid From: 1747934359
  Observations Timestamp: 1747934359
  Full Report: 0x00090d9e8d96765a0c49e03a6ae05c82e8f8de...

^CShutting down...

Production Considerations

While this example demonstrates WebSocket authentication, production applications should consider:

  • Connection resilience: Implement automatic reconnection with exponential backoff
  • Heartbeat mechanism: Send periodic pings to detect stale connections
  • Message buffering: Queue messages during reconnection attempts
  • Error handling: Use custom error types for better error categorization
  • Logging: Replace println! with structured logging (e.g., tracing, env_logger)
  • Configuration: Make WebSocket endpoints and timeouts configurable
  • Graceful shutdown: Properly close WebSocket connections with close frames
  • Testing: Add tests for connection handling and message parsing

For production use, consider using the Rust SDK which handles authentication automatically and provides built-in fault tolerance.

Get the latest Chainlink content straight to your inbox.