Stream and decode V3 reports via WebSocket using the Rust SDK

Guide Versions

This guide is available in multiple versions. Choose the one that matches your needs.

In this guide, you'll learn how to use Chainlink Data Streams with the Streams Direct implementation and the Data Streams SDK for Rust to subscribe to real-time V3 reports for Crypto streams via a WebSocket connection. You'll set up your Rust project, listen for real-time reports from the Data Streams Aggregation Network, decode the report data, and log their attributes to your terminal.

Requirements

  • Git: Make sure you have Git installed. You can check your current version by running git --version in your terminal and download the latest version from the official Git website if necessary.
  • Rust: Make sure you have Rust installed. You can install Rust by following the instructions on the official Rust website.
  • API Credentials: Access to the Streams Direct implementation requires API credentials. If you haven't already, contact us to request mainnet or testnet early access.

Guide

Set up your Rust project

  1. Create a new directory for your project and navigate to it:

    mkdir my-data-streams-project && cd my-data-streams-project
    
  2. Initialize a new Rust project:

    cargo init
    
  3. Add the Data Streams SDK to your Cargo.toml:

    [dependencies]
    data-streams-sdk = { git = "https://github.com/smartcontractkit/data-streams-sdk.git", subdir = "rust/crates/sdk", features = ["tracing"] }
    data-streams-report = { git = "https://github.com/smartcontractkit/data-streams-sdk.git", subdir = "rust/crates/report" }
    tokio = { version = "1.4", features = ["full"] }
    hex = "0.4"
    tracing = "0.1"
    tracing-subscriber = { version = "0.3", features = ["time"] }
    

    Note: The tracing feature is required for logging functionality.

Establish a WebSocket connection and listen for real-time reports

  1. Replace the contents of src/main.rs with the following code:

    use data_streams_report::feed_id::ID;
    use data_streams_report::report::{decode_full_report, v3::ReportDataV3}; // Import the v3 report schema for Crypto streams
    use data_streams_sdk::config::Config;
    use data_streams_sdk::stream::Stream;
    use std::env;
    use std::error::Error;
    use tracing::{info, warn};
    use tracing_subscriber::fmt::time::UtcTime;
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
        // Initialize logging with UTC timestamps
        tracing_subscriber::fmt()
            .with_timer(UtcTime::rfc_3339())
            .with_max_level(tracing::Level::INFO)
            .init();
    
        // Get feed IDs from command line arguments
        let args: Vec<String> = env::args().collect();
        if args.len() < 2 {
            eprintln!("Usage: cargo run [StreamID1] [StreamID2] ...");
            std::process::exit(1);
        }
    
        // Get API credentials from environment variables
        let api_key = env::var("API_KEY").expect("API_KEY must be set");
        let api_secret = env::var("API_SECRET").expect("API_SECRET must be set");
    
        // Parse feed IDs from command line arguments
        let mut feed_ids = Vec::new();
        for arg in args.iter().skip(1) {
            let feed_id = ID::from_hex_str(arg)?;
            feed_ids.push(feed_id);
        }
    
        // Initialize the configuration
        let config = Config::new(
            api_key,
            api_secret,
            "https://api.testnet-dataengine.chain.link".to_string(),
            "wss://ws.testnet-dataengine.chain.link".to_string(),
        )
        .build()?;
    
        // Create and initialize the stream
        let mut stream = Stream::new(&config, feed_ids).await?;
        stream.listen().await?;
    
        info!("WebSocket connection established. Listening for reports...");
    
        // Process incoming reports
        loop {
            match stream.read().await {
                Ok(response) => {
                    info!("\nRaw report data: {:?}\n", response.report);
    
                    // Decode the report
                    let full_report = hex::decode(&response.report.full_report[2..])?;
                    let (_report_context, report_blob) = decode_full_report(&full_report)?;
                    let report_data = ReportDataV3::decode(&report_blob)?;
    
                    // Print decoded report details
                    info!(
                        "\n--- Report Stream ID: {} ---\n\
                         ------------------------------------------\n\
                         Observations Timestamp : {}\n\
                         Price                 : {}\n\
                         Bid                   : {}\n\
                         Ask                   : {}\n\
                         Valid From Timestamp  : {}\n\
                         Expires At           : {}\n\
                         Link Fee             : {}\n\
                         Native Fee           : {}\n\
                         ------------------------------------------",
                        response.report.feed_id.to_hex_string(),
                        response.report.observations_timestamp,
                        report_data.benchmark_price,
                        report_data.bid,
                        report_data.ask,
                        response.report.valid_from_timestamp,
                        report_data.expires_at,
                        report_data.link_fee,
                        report_data.native_fee,
                    );
    
                    // Print stream stats
                    info!(
                        "\n--- Stream Stats ---\n{:#?}\n\
                         --------------------------------------------------------------------------------------------------------------------------------------------",
                        stream.get_stats()
                    );
                }
                Err(e) => {
                    warn!("Error reading from stream: {:?}", e);
                }
            }
        }
    
        // Note: In a production environment, you should implement proper cleanup
        // by calling stream.close() when the application is terminated.
        // For example:
        //
        // tokio::select! {
        //     _ = tokio::signal::ctrl_c() => {
        //         info!("Received shutdown signal");
        //         stream.close().await?;
        //     }
        //     result = stream.read() => {
        //         // Process result
        //     }
        // }
    }
    
  2. Set up your API credentials as environment variables:

    export API_KEY="<YOUR_API_KEY>"
    export API_SECRET="<YOUR_API_SECRET>"
    

    Replace <YOUR_API_KEY> and <YOUR_API_SECRET> with your API credentials.

  3. For this example, you'll subscribe to the ETH/USD Data Streams crypto stream. This stream ID is 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782. See the Crypto Streams page for a complete list of available crypto assets.

    Build and run your application:

    cargo run -- 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
    

    Expect output similar to the following in your terminal:

    2024-12-13T23:07:56.463719Z  INFO my_data_streams_project: WebSocket connection established. Listening for reports...
    2024-12-13T23:07:56.463824Z  INFO data_streams_sdk::stream::monitor_connection: Received ping: [49]
    2024-12-13T23:07:56.463868Z  INFO data_streams_sdk::stream::monitor_connection: Responding with pong: [49]
    2024-12-13T23:07:57.060504Z  INFO data_streams_sdk::stream::monitor_connection: Received new report from Data Streams Endpoint.
    2024-12-13T23:07:57.061078Z  INFO my_data_streams_project:
    Raw report data: Report { feed_id: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782, valid_from_timestamp: 1734131277, observations_timestamp: 1734131277, full_report: "0x0006f9b553e393ced311551efd30d1decedb63d76ad41737462e2cdbbdff1578000000000000000000000000000000000000000000000000000000004f5ac90d000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000220000000000000000000000000000000000000000000000000000000000000028001010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000120000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba78200000000000000000000000000000000000000000000000000000000675cbe4d00000000000000000000000000000000000000000000000000000000675cbe4d000000000000000000000000000000000000000000000000000017489b06e8fc000000000000000000000000000000000000000000000000000c7badb615bd1400000000000000000000000000000000000000000000000000000000675e0fcd0000000000000000000000000000000000000000000000d3c0d34ca0d14d85600000000000000000000000000000000000000000000000d3bda64c97c9f3a3a00000000000000000000000000000000000000000000000d3c1a08e0cffd77690000000000000000000000000000000000000000000000000000000000000000238102110cad488ecf151a17276fcfad6ef1f05593edfe80f6823b729416f826972ba32d085525b1d7ab79e6ae8188928c86051a4fc75f500bffabda2acd1d1f900000000000000000000000000000000000000000000000000000000000000024dddbc660abf75c30cb3c2aa375c87d228b2ee8735e339f59c5214897c0b89af39a7602df754364cce029f6eb7699ee02ffded96d0c46b5919e81ee4f650d1cb" }
    
    2024-12-13T23:07:57.062344Z  INFO my_data_streams_project:
    --- Report Stream ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782 ---
    ------------------------------------------
    Observations Timestamp : 1734131277
    Price                 : 3906157533081673500000
    Bid                   : 3905928693886829700000
    Ask                   : 3906215307384792250000
    Valid From Timestamp  : 1734131277
    Expires At           : 1734217677
    Link Fee             : 3513685734964500
    Native Fee           : 25600606005500
    ------------------------------------------
    2024-12-13T23:07:57.062489Z  INFO my_data_streams_project:
    --- Stream Stats ---
    StatsSnapshot {
        accepted: 1,
        deduplicated: 0,
        total_received: 1,
        partial_reconnects: 0,
        full_reconnects: 0,
        configured_connections: 1,
        active_connections: 1,
    }
    --------------------------------------------------------------------------------------------------------------------------------------------
    2024-12-13T23:07:58.065686Z  INFO data_streams_sdk::stream::monitor_connection: Received new report from Data Streams Endpoint.
    2024-12-13T23:07:58.066315Z  INFO my_data_streams_project:
    Raw report data: Report { feed_id: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782, valid_from_timestamp: 1734131278, observations_timestamp: 1734131278, full_report: "0x0006f9b553e393ced311551efd30d1decedb63d76ad41737462e2cdbbdff1578000000000000000000000000000000000000000000000000000000004f5ac911000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000220000000000000000000000000000000000000000000000000000000000000028000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000120000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba78200000000000000000000000000000000000000000000000000000000675cbe4e00000000000000000000000000000000000000000000000000000000675cbe4e00000000000000000000000000000000000000000000000000001748ee300af4000000000000000000000000000000000000000000000000000c7b8b51304fbc00000000000000000000000000000000000000000000000000000000675e0fce0000000000000000000000000000000000000000000000d3bddf08d0b10a28e00000000000000000000000000000000000000000000000d3bb84af9f92f963c00000000000000000000000000000000000000000000000d3bf1d6bf14e501fc000000000000000000000000000000000000000000000000000000000000000021402b6b82c20826315384d74b3235b95f136ac65bba8c9e97c24d786e499894f298b51ae4aeba55cce0f85f2463e49e0a5e001b9a66f5b7b91e8be37d81d6cc5000000000000000000000000000000000000000000000000000000000000000217895cb599abc88d7b695edafed5ca5a5fc970f079b48bc2218888eec1fcccb0430c1ba2aa13b0d10f6c6b19a43cdb770029f4fb5804b0e2ef5ba3e73ca710f8" }
    
    2024-12-13T23:07:58.067395Z  INFO my_data_streams_project:
    --- Report Stream ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782 ---
    ------------------------------------------
    Observations Timestamp : 1734131278
    Price                 : 3905944663438106700000
    Bid                   : 3905775117434634200000
    Ask                   : 3906034281472429400000
    Valid From Timestamp  : 1734131278
    Expires At           : 1734217678
    Link Fee             : 3513538013319100
    Native Fee           : 25602001210100
    ------------------------------------------
    2024-12-13T23:07:58.067633Z  INFO my_data_streams_project:
    --- Stream Stats ---
    StatsSnapshot {
        accepted: 2,
        deduplicated: 0,
        total_received: 2,
        partial_reconnects: 0,
        full_reconnects: 0,
        configured_connections: 1,
        active_connections: 1,
    }
    
    [...]
    

The example above demonstrates streaming data from a single crypto stream. For production environments, especially when subscribing to multiple streams, it's recommended to enable High Availability (HA) mode. This can be achieved by:

  1. Adding multiple WebSocket endpoints in the configuration:

    "wss://ws.testnet-dataengine.chain.link,wss://ws.testnet-dataengine.chain.link"
    
  2. Enabling HA mode in the configuration:

    use data_streams_sdk::config::WebSocketHighAvailability;
    // ...
    .with_ws_ha(WebSocketHighAvailability::Enabled)
    

When HA mode is enabled and multiple WebSocket origins are provided, the Stream will maintain concurrent connections to different instances. This ensures high availability, fault tolerance, and minimizes the risk of report gaps.

You can subscribe to multiple streams by providing additional stream IDs as command-line arguments:

cargo run -- \
  0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782 \
  0x00037da06d56d083fe599397a4769a042d63aa73dc4ef57709d31e9971a5b439

This will subscribe to both ETH/USD and BTC/USD streams.

Decoded report details

The decoded report details include:

AttributeValueDescription
Stream ID0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782The unique identifier for the stream. In this example, the stream is for ETH/USD.
Observations Timestamp1734131277The timestamp indicating when the data was captured.
Price3906157533081673500000The observed price in the report, with 18 decimals. For readability: 3,906.1575330816735 USD per ETH.
Bid3905928693886829700000The highest price a buyer is willing to pay for an asset, with 18 decimals. For readability: 3,905.9286938868297 USD per ETH. Learn more about the Bid price.
Ask3906215307384792250000The lowest price a seller is willing to accept for an asset, with 18 decimals. For readability: 3,906.2153073847923 USD per ETH. Learn more about the Ask price.
Valid From Timestamp1734131277The start validity timestamp for the report, indicating when the data becomes relevant.
Expires At1734217677The expiration timestamp of the report, indicating the point at which the data becomes outdated.
Link Fee3513685734964500The fee to pay in LINK tokens for the onchain verification of the report data. With 18 decimals. For readability: 0.0035136857349645 LINK. Note: This example fee is not indicative of actual fees.
Native Fee25600606005500The fee to pay in the native blockchain token (e.g., ETH on Ethereum) for the onchain verification of the report data. With 18 decimals. For readability: 0.0000256006060055 ETH. Note: This example fee is not indicative of actual fees.

Payload for onchain verification

In this guide, you log and decode the full_report payload to extract the report data. In a production environment, you should verify the data to ensure its integrity and authenticity. Refer to the Verify report data onchain guide.

Explanation

Establishing a WebSocket connection and listening for reports

The WebSocket connection is established in two steps:

  1. Stream::new initializes a new stream instance with your configuration and feed IDs. This function prepares the connection parameters but doesn't establish the connection yet.

  2. stream.listen() establishes the actual WebSocket connection and starts the background tasks that maintain the connection. These tasks handle:

    • Automatic reconnection if the connection is lost
    • Ping/pong messages to keep the connection alive
    • Message queueing and delivery

Decoding a report

As data reports arrive via the WebSocket connection, they are processed in real-time through several steps:

  1. Reading streams: The read method on the Stream object is called within a loop. This asynchronous method:

    • Awaits the next report from the WebSocket connection
    • Handles backpressure automatically
    • Returns a WebSocketReport containing the report data
  2. Decoding reports: Each report is decoded in two stages:

    • decode_full_report parses the raw hexadecimal data, separating the report context (containing metadata) from the report blob
    • ReportDataV3::decode transforms the report blob into a structured format containing:
      • The benchmark price (with 18 decimal places)
      • Bid and ask prices for liquidity-weighted pricing
      • Fee information for onchain verification
      • Timestamp information

Handling the decoded data

The example demonstrates several best practices for handling the decoded data:

  1. Logging:

    • Uses the tracing crate for structured logging
    • Configures UTC timestamps for consistent time representation
    • Includes both raw report data and decoded fields for debugging
  2. Error handling:

    • Uses Rust's Result type for robust error handling
    • Implements the ? operator for clean error propagation
    • Logs errors with appropriate context using warn! macro
  3. Stream monitoring:

    • Tracks stream statistics through get_stats()
    • Monitors connection status and reconnection attempts
    • Reports message acceptance and deduplication counts

The decoded data can be used for further processing, analysis, or display in your application. For production environments, it's recommended to verify the data onchain using the provided full_report payload.

What's next

Get the latest Chainlink content straight to your inbox.