Stream and decode V4 reports via WebSocket using the Rust SDK

Guide Versions

This guide is available in multiple versions. Choose the one that matches your needs.

In this guide, you'll learn how to use Chainlink Data Streams with the Streams Direct implementation and the Data Streams SDK for Rust to subscribe to real-time V4 reports for Real World Assets (RWA) streams via a WebSocket connection. You'll set up your Rust project, listen for real-time reports from the Data Streams Aggregation Network, decode the report data, and log their attributes to your terminal.

Requirements

  • Git: Make sure you have Git installed. You can check your current version by running git --version in your terminal and download the latest version from the official Git website if necessary.
  • Rust: Make sure you have Rust installed. You can install Rust by following the instructions on the official Rust website.
  • API Credentials: Access to the Streams Direct implementation requires API credentials. If you haven't already, contact us to request mainnet or testnet early access.

Guide

Set up your Rust project

  1. Create a new directory for your project and navigate to it:

    mkdir my-data-streams-project && cd my-data-streams-project
    
  2. Initialize a new Rust project:

    cargo init
    
  3. Add the Data Streams SDK to your Cargo.toml:

    [dependencies]
    data-streams-sdk = { git = "https://github.com/smartcontractkit/data-streams-sdk.git", subdir = "rust/crates/sdk" }
    data-streams-report = { git = "https://github.com/smartcontractkit/data-streams-sdk.git", subdir = "rust/crates/report" }
    tokio = { version = "1.4", features = ["full"] }
    hex = "0.4"
    tracing = "0.1"
    tracing-subscriber = { version = "0.3", features = ["time"] }
    

Establish a WebSocket connection and listen for real-time reports

  1. Replace the contents of src/main.rs with the following code:

    use data_streams_report::feed_id::ID;
    use data_streams_report::report::{decode_full_report, v4::ReportDataV4}; // Import the v4 report schema for RWA streams
    use data_streams_sdk::config::Config;
    use data_streams_sdk::stream::Stream;
    use std::env;
    use std::error::Error;
    use tracing::{info, warn};
    use tracing_subscriber::fmt::time::UtcTime;
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
        // Initialize logging with UTC timestamps
        tracing_subscriber::fmt()
            .with_timer(UtcTime::rfc_3339())
            .with_max_level(tracing::Level::INFO)
            .init();
    
        // Get feed IDs from command line arguments
        let args: Vec<String> = env::args().collect();
        if args.len() < 2 {
            eprintln!("Usage: cargo run [StreamID1] [StreamID2] ...");
            std::process::exit(1);
        }
    
        // Get API credentials from environment variables
        let api_key = env::var("API_KEY").expect("API_KEY must be set");
        let api_secret = env::var("API_SECRET").expect("API_SECRET must be set");
    
        // Parse feed IDs from command line arguments
        let mut feed_ids = Vec::new();
        for arg in args.iter().skip(1) {
            let feed_id = ID::from_hex_str(arg)?;
            feed_ids.push(feed_id);
        }
    
        // Initialize the configuration
        let config = Config::new(
            api_key,
            api_secret,
            "https://api.testnet-dataengine.chain.link".to_string(),
            "wss://ws.testnet-dataengine.chain.link".to_string(),
        )
        .build()?;
    
        // Create and initialize the stream
        let mut stream = Stream::new(&config, feed_ids).await?;
        stream.listen().await?;
    
        info!("WebSocket connection established. Listening for reports...");
    
        // Process incoming reports
        loop {
            match stream.read().await {
                Ok(response) => {
                    info!("\nRaw report data: {:?}\n", response.report);
    
                    // Decode the report
                    let full_report = hex::decode(&response.report.full_report[2..])?;
                    let (_report_context, report_blob) = decode_full_report(&full_report)?;
                    let report_data = ReportDataV4::decode(&report_blob)?;
    
                    // Print decoded report details
                    info!(
                        "\n--- Report Stream ID: {} ---\n\
                         ------------------------------------------\n\
                         Observations Timestamp : {}\n\
                         Benchmark Price       : {}\n\
                         Valid From Timestamp  : {}\n\
                         Expires At           : {}\n\
                         Link Fee             : {}\n\
                         Native Fee           : {}\n\
                         Market Status        : {}\n\
                         ------------------------------------------",
                        response.report.feed_id.to_hex_string(),
                        response.report.observations_timestamp,
                        report_data.price,
                        response.report.valid_from_timestamp,
                        report_data.expires_at,
                        report_data.link_fee,
                        report_data.native_fee,
                        report_data.market_status
                    );
    
                    // Print stream stats
                    info!(
                        "\n--- Stream Stats ---\n{:#?}\n\
                         --------------------------------------------------------------------------------------------------------------------------------------------",
                        stream.get_stats()
                    );
                }
                Err(e) => {
                    warn!("Error reading from stream: {:?}", e);
                }
            }
        }
    }
    
  2. Set up your API credentials as environment variables:

    export API_KEY="<YOUR_API_KEY>"
    export API_SECRET="<YOUR_API_SECRET>"
    

    Replace <YOUR_API_KEY> and <YOUR_API_SECRET> with your API credentials.

  3. For this example, you'll subscribe to the AUD/USD Data Streams RWA stream. This stream ID is 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea. See the RWA Streams page for a complete list of available real-world assets.

    Build and run your application:

    cargo run -- 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea
    

    Expect output similar to the following in your terminal:

    2024-12-13T23:37:17.054816Z  INFO my_data_streams_project: WebSocket connection established. Listening for reports...
    2024-12-13T23:37:17.054876Z  INFO data_streams_sdk::stream::monitor_connection: Received ping: [49]
    2024-12-13T23:37:17.054896Z  INFO data_streams_sdk::stream::monitor_connection: Responding with pong: [49]
    2024-12-13T23:37:17.199112Z  INFO data_streams_sdk::stream::monitor_connection: Received new report from Data Streams Endpoint.
    2024-12-13T23:37:17.199304Z  INFO my_data_streams_project:
    Raw report data: Report { feed_id: 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea, valid_from_timestamp: 1734133037, observations_timestamp: 1734133037, full_report: "0x0006aee203ef23a892e75b579f8c3f26fd933d9ca45de95c2f8ac470f4ddcd76000000000000000000000000000000000000000000000000000000000b42b108000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000026000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea00000000000000000000000000000000000000000000000000000000675cc52d00000000000000000000000000000000000000000000000000000000675cc52d0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000675e16ad00000000000000000000000000000000000000000000000008d4182644500000000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000021521eae32097ed41bc3dad69f365da1979d155b574cc84e4dfb9e7970821d6bc40fac4c300b09b7cfae106f1008b4bec83edd27f20c5a3ad6b2c736e0765ecc500000000000000000000000000000000000000000000000000000000000000024941c68ad5a7d69e92fc927633634cd6cf478f7c11e3c3408341b462dac292ce66cc18bf09c30ad1df589dbfc0a19172a4719bb51efe83450ce14d959bccf541" }
    
    2024-12-13T23:37:17.199585Z  INFO my_data_streams_project:
    --- Report Stream ID: 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea ---
    ------------------------------------------
    Observations Timestamp : 1734133037
    Benchmark Price       : 636160000000000000
    Valid From Timestamp  : 1734133037
    Expires At           : 1734219437
    Link Fee             : 0
    Native Fee           : 0
    Market Status        : 1
    ------------------------------------------
    2024-12-13T23:37:17.199633Z  INFO my_data_streams_project:
    --- Stream Stats ---
    StatsSnapshot {
        accepted: 1,
        deduplicated: 0,
        total_received: 1,
        partial_reconnects: 0,
        full_reconnects: 0,
        configured_connections: 1,
        active_connections: 1,
    }
    --------------------------------------------------------------------------------------------------------------------------------------------
    2024-12-13T23:37:18.215222Z  INFO data_streams_sdk::stream::monitor_connection: Received new report from Data Streams Endpoint.
    2024-12-13T23:37:18.21587Z  INFO my_data_streams_project:
    Raw report data: Report { feed_id: 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea, valid_from_timestamp: 1734133038, observations_timestamp: 1734133038, full_report: "0x0006aee203ef23a892e75b579f8c3f26fd933d9ca45de95c2f8ac470f4ddcd76000000000000000000000000000000000000000000000000000000000b42b10c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000026000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea00000000000000000000000000000000000000000000000000000000675cc52e00000000000000000000000000000000000000000000000000000000675cc52e0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000675e16ae00000000000000000000000000000000000000000000000008d4182644500000000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000026a76e4abe53b8f3f7f0ca1628335a0da840d28502ea2c44e67951b17b8cc74da781aa786d5503c09043c29d0db71db2b0de3b5b5302a9050b7acdf93fdf4a36f00000000000000000000000000000000000000000000000000000000000000027d06597171eceea257fc1eaa222d4b47a0b5d227e8bf746031e9061806e55b2778fa8c6e10a574d7f9a4e192090f71769468eb3e626b3354e6cdaadff120e170" }
    
    2024-12-13T23:37:18.216946Z  INFO my_data_streams_project:
    --- Report Stream ID: 0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea ---
    ------------------------------------------
    Observations Timestamp : 1734133038
    Benchmark Price       : 636160000000000000
    Valid From Timestamp  : 1734133038
    Expires At           : 1734219438
    Link Fee             : 0
    Native Fee           : 0
    Market Status        : 1
    ------------------------------------------
    2024-12-13T23:37:18.21714Z  INFO my_data_streams_project:
    --- Stream Stats ---
    StatsSnapshot {
        accepted: 2,
        deduplicated: 0,
        total_received: 2,
        partial_reconnects: 0,
        full_reconnects: 0,
        configured_connections: 1,
        active_connections: 1,
    }
    
    [...]
    

The example above demonstrates streaming data from a single RWA stream. For production environments, especially when subscribing to multiple streams, it's recommended to enable High Availability (HA) mode. This can be achieved by:

  1. Adding multiple WebSocket endpoints in the configuration:

    "wss://ws.testnet-dataengine.chain.link,wss://ws.testnet-dataengine.chain.link"
    
  2. Enabling HA mode in the configuration:

    use data_streams_sdk::config::WebSocketHighAvailability;
    // ...
    .with_ws_ha(WebSocketHighAvailability::Enabled)
    

When HA mode is enabled and multiple WebSocket origins are provided, the Stream will maintain concurrent connections to different instances. This ensures high availability, fault tolerance, and minimizes the risk of report gaps.

You can subscribe to multiple streams by providing additional stream IDs as command-line arguments:

cargo run -- \
  0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7ea \
  0x0004f41f3d47d2c15f817309d905d4508e809c98b1eafc7b6cdc2a6c8ec23f3f

This will subscribe to both AUD/USD and EUR/USD streams.

Decoded report details

The decoded report details include:

AttributeValueDescription
Stream ID0x000434a5b30cafe7e853832a458ea1591dc2f5fb5e4cf80b9979b8248065a7eaThe unique identifier for the stream. In this example, the stream is for AUD/USD.
Observations Timestamp1734133037The timestamp indicating when the data was captured.
Benchmark Price636160000000000000The observed price in the report, with 18 decimals. For readability: 0.63616 USD per AUD.
Valid From Timestamp1734133037The start validity timestamp for the report, indicating when the data becomes relevant.
Expires At1734219437The expiration timestamp of the report, indicating the point at which the data becomes outdated.
Link Fee0The fee to pay in LINK tokens for the onchain verification of the report data. With 18 decimals. Note: This example fee is not indicative of actual fees.
Native Fee0The fee to pay in the native blockchain token (e.g., ETH on Ethereum) for the onchain verification of the report data. With 18 decimals. Note: This example fee is not indicative of actual fees.
Market Status1The DON's consensus on whether the market is currently open. Possible values: 0 (Unknown), 1 (Closed), 2 (Open). In this case, the market is closed.

Payload for onchain verification

In this guide, you log and decode the full_report payload to extract the report data. In a production environment, you should verify the data onchain to ensure its integrity and authenticity. Refer to the Verify report data onchain guide.

Explanation

Establishing a WebSocket connection and listening for reports

The WebSocket connection is established in two steps:

  1. Stream::new initializes a new stream instance with your configuration and feed IDs. This function prepares the connection parameters but doesn't establish the connection yet.

  2. stream.listen() establishes the actual WebSocket connection and starts the background tasks that maintain the connection. These tasks handle:

    • Automatic reconnection if the connection is lost
    • Ping/pong messages to keep the connection alive
    • Message queueing and delivery

Decoding a report

As data reports arrive via the WebSocket connection, they are processed in real-time through several steps:

  1. Reading streams: The read method on the Stream object is called within a loop. This asynchronous method:

    • Awaits the next report from the WebSocket connection
    • Handles backpressure automatically
    • Returns a WebSocketReport containing the report data
  2. Decoding reports: Each report is decoded in two stages:

    • decode_full_report parses the raw hexadecimal data, separating the report context (containing metadata) from the report blob
    • ReportDataV4::decode transforms the report blob into a structured format containing:
      • The benchmark price (with 18 decimal places)
      • Market status (indicating if the market is open, closed, or unknown)
      • Fee information for onchain verification
      • Timestamp information

Handling the decoded data

The example demonstrates several best practices for handling the decoded data:

  1. Logging:

    • Uses the tracing crate for structured logging
    • Configures UTC timestamps for consistent time representation
    • Includes both raw report data and decoded fields for debugging
  2. Error handling:

    • Uses Rust's Result type for robust error handling
    • Implements the ? operator for clean error propagation
    • Logs errors with appropriate context using warn! macro
  3. Stream monitoring:

    • Tracks stream statistics through get_stats()
    • Monitors connection status and reconnection attempts
    • Reports message acceptance and deduplication counts

The decoded data can be used for further processing, analysis, or display in your application. For production environments, it's recommended to verify the data onchain using the provided full_report payload.

What's next

Get the latest Chainlink content straight to your inbox.