Stream and Decode reports using the Rust SDK (WebSocket)

Guide Versions

This guide is available in multiple versions. Choose the one that matches your needs.

In this guide, you'll learn how to use the Data Streams SDK for Rust to stream and decode DataLink feeds from the Aggregation Network. You'll set up your Rust project, listen for real-time reports, decode them, and log their attributes.

Requirements

  • Rust: Make sure you have Rust installed. You can install Rust by following the instructions on the official Rust website.
  • API Credentials: Access to DataLink requires API credentials to connect to the Aggregation Network. If you haven't already, contact us to request access.

Guide

Set up your Rust project

  1. Create a new directory for your project and navigate to it:

    mkdir my-datalink-project && cd my-datalink-project
    
  2. Initialize a new Rust project:

    cargo init
    
  3. Add the following dependencies to your Cargo.toml file:

    [dependencies]
    chainlink-data-streams-sdk = "1.0.0"
    chainlink-data-streams-report = "1.0.0"
    tokio = { version = "1.4", features = ["full"] }
    hex = "0.4"
    tracing = "0.1"
    tracing-subscriber = { version = "0.3", features = ["time"] }
    

    Note: The tracing feature is required for logging functionality.

Understanding Report Schema Versions

Data Providers may use different report schema versions. The schema version determines the structure of the data returned by the feed and affects how you should decode the report.

  1. Import the appropriate schema version in your code (e.g., v4).
  2. Use that version when decoding the report with report.Decode[v4.Data]().

Different schema versions have different fields and structures.

In this example, we're using report schema v4 for the EUR/USD feed, but your implementation should match the schema version specified by your Data Provider.

Establish a WebSocket connection and listen for real-time reports

  1. Replace the contents of src/main.rs with the following code:

    use chainlink_data_streams_report::feed_id::ID;
    use chainlink_data_streams_report::report::{ decode_full_report, v4::ReportDataV4 };
    use chainlink_data_streams_sdk::config::Config;
    use chainlink_data_streams_sdk::stream::Stream;
    use std::env;
    use std::error::Error;
    use tracing::{ info, warn };
    use tracing_subscriber::fmt::time::UtcTime;
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
       // Initialize logging with UTC timestamps
       tracing_subscriber
          ::fmt()
          .with_timer(UtcTime::rfc_3339())
          .with_max_level(tracing::Level::INFO)
          .init();
    
       // Get feed IDs from command line arguments
       let args: Vec<String> = env::args().collect();
       if args.len() < 2 {
          eprintln!("Usage: cargo run [FeedID1] [FeedID2] ...");
          std::process::exit(1);
       }
    
       // Get API credentials from environment variables
       let api_key = env::var("API_KEY").expect("API_KEY must be set");
       let api_secret = env::var("API_SECRET").expect("API_SECRET must be set");
    
       // Parse feed IDs from command line arguments
       let mut feed_ids = Vec::new();
       for arg in args.iter().skip(1) {
          let feed_id = ID::from_hex_str(arg)?;
          feed_ids.push(feed_id);
       }
    
       // Initialize the configuration
       let config = Config::new(
          api_key,
          api_secret,
          "https://api.testnet-dataengine.chain.link".to_string(),
          "wss://ws.testnet-dataengine.chain.link".to_string()
       ).build()?;
    
       // Create and initialize the stream
       let mut stream = Stream::new(&config, feed_ids).await?;
       stream.listen().await?;
    
       info!("WebSocket connection established. Listening for reports...");
    
       // Process incoming reports
       loop {
          match stream.read().await {
                Ok(response) => {
                   info!("\nRaw report data: {:?}\n", response.report);
    
                   // Decode the report
                   let full_report = hex::decode(&response.report.full_report[2..])?;
                   let (_report_context, report_blob) = decode_full_report(&full_report)?;
                   let report_data = ReportDataV4::decode(&report_blob)?;
    
                   // Print decoded report details
                   info!(
                      "\n--- Report Feed ID: {} ---\n\
                      ------------------------------------------\n\
                      Observations Timestamp : {}\n\
                      Benchmark Price       : {}\n\
                      Valid From Timestamp  : {}\n\
                      Expires At           : {}\n\
                      Link Fee             : {}\n\
                      Native Fee           : {}\n\
                      Market Status        : {}\n\
                      ------------------------------------------",
                      response.report.feed_id.to_hex_string(),
                      response.report.observations_timestamp,
                      report_data.price,
                      response.report.valid_from_timestamp,
                      report_data.expires_at,
                      report_data.link_fee,
                      report_data.native_fee,
                      report_data.market_status
                   );
    
                   // Print stream stats
                   info!(
                      "\n--- Stream Stats ---\n{:#?}\n\
                         --------------------------------------------------------------------------------------------------------------------------------------------",
                      stream.get_stats()
                   );
                }
                Err(e) => {
                   warn!("Error reading from stream: {:?}", e);
                }
          }
       }
    
       // Note: In a production environment, you should implement proper cleanup
       // by calling stream.close() when the application is terminated.
       // For example:
       //
       // tokio::select! {
       //     _ = tokio::signal::ctrl_c() => {
       //         info!("Received shutdown signal");
       //         stream.close().await?;
       //     }
       //     result = stream.read() => {
       //         // Process result
       //     }
       // }
    }
    
  2. Set up your API credentials as environment variables:

    export API_KEY="<YOUR_API_KEY>"
    export API_SECRET="<YOUR_API_SECRET>"
    

    Replace <YOUR_API_KEY> and <YOUR_API_SECRET> with your API credentials.

  3. For this example, you'll subscribe to the EUR/USD DataLink feed on testnet. This feed ID is 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce.

    Build and run your application:

    cargo run -- 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce
    

    Expect output similar to the following in your terminal:

    2025-06-03T16:18:20.232313Z  INFO my_data_link_project: WebSocket connection established. Listening for reports...
    2025-06-03T16:18:20.232481Z  INFO chainlink_data_streams_sdk::stream::monitor_connection: Received ping: [49]
    2025-06-03T16:18:20.232534Z  INFO chainlink_data_streams_sdk::stream::monitor_connection: Responding with pong: [49]
    2025-06-03T16:18:20.550416Z  INFO chainlink_data_streams_sdk::stream::monitor_connection: Received new report from Data Streams Endpoint.
    2025-06-03T16:18:20.550857Z  INFO my_data_link_project:
    Raw report data: Report { feed_id: 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce, valid_from_timestamp: 1748967500, observations_timestamp: 1748967500, full_report: "0x00090d9e8d96765a0c49e03a6ae05c82e8f8de70cf179baa632f18313e54bd6900000000000000000000000000000000000000000000000000000000004165ff000000000000000000000000000000000000000000000000000000030000000100000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000260010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce00000000000000000000000000000000000000000000000000000000683f204c00000000000000000000000000000000000000000000000000000000683f204c00000000000000000000000000000000000000000000000000006f12bdac46c0000000000000000000000000000000000000000000000000004f29241147b58e000000000000000000000000000000000000000000000000000000006866ad4c0000000000000000000000000000000000000000000000000fcb2a7202a220000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000291e7ab37d47a051d06bf9a17e743a30305560fa1ed63eb1e94530b9ff8e00998f2dc9e4876e60bde9f43fbbeb3a1c98bca91b71c98f25a329aa4843a1cdf5acc00000000000000000000000000000000000000000000000000000000000000023d6c77dce452fedcb47942020c574f291fdb259c64e2a42cfce0fbe2f2df092a3703bb5e167b80f388c323ec1d3cf9d298bc077d903a4297671c395e6d34b550" }
    
    2025-06-03T16:18:20.551775Z  INFO my_data_link_project:
    --- Report Feed ID: 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce ---
    ------------------------------------------
    Observations Timestamp : 1748967500
    Benchmark Price       : 1138050000000000000
    Valid From Timestamp  : 1748967500
    Expires At           : 1751559500
    Link Fee             : 22281758045615502
    Native Fee           : 122126282278592
    Market Status        : 2
    ------------------------------------------
    2025-06-03T16:18:20.551946Z  INFO my_data_link_project:
    --- Stream Stats ---
    StatsSnapshot {
       accepted: 1,
       deduplicated: 0,
       total_received: 1,
       partial_reconnects: 0,
       full_reconnects: 0,
       configured_connections: 1,
       active_connections: 1,
    }
    --------------------------------------------------------------------------------------------------------------------------------------------
    2025-06-03T16:18:21.503569Z  INFO chainlink_data_streams_sdk::stream::monitor_connection: Received new report from Data Streams Endpoint.
    2025-06-03T16:18:21.503786Z  INFO my_data_link_project:
    Raw report data: Report { feed_id: 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce, valid_from_timestamp: 1748967501, observations_timestamp: 1748967501, full_report: "0x00090d9e8d96765a0c49e03a6ae05c82e8f8de70cf179baa632f18313e54bd690000000000000000000000000000000000000000000000000000000000416602000000000000000000000000000000000000000000000000000000030000000100000000000000000000000000000000000000000000000000000000000000e000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000260010100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce00000000000000000000000000000000000000000000000000000000683f204d00000000000000000000000000000000000000000000000000000000683f204d00000000000000000000000000000000000000000000000000006f120e9d6f70000000000000000000000000000000000000000000000000004f2899581124ed000000000000000000000000000000000000000000000000000000006866ad4d0000000000000000000000000000000000000000000000000fcb2a7202a2200000000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000002bfc1839b35307881f3bca8fb4d5f08dc3da6d60f8ed43e31b36bbccbdc8e1abb9f8bf045a6c3cb96fba8536e81b3e92a54b4762cd1a85ad552cf4c664715c0bd00000000000000000000000000000000000000000000000000000000000000023b0c7ad0fdfdb598d53fee4d7026957c1c30e8c9056a267dc40d8ee8000168a72d6a2349a71f41a200b8f600fbedfb5b4c4ebf69ac86a794748268a9b3972594" }
    
    2025-06-03T16:18:21.504481Z  INFO my_data_link_project:
    --- Report Feed ID: 0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ce ---
    ------------------------------------------
    Observations Timestamp : 1748967501
    Benchmark Price       : 1138050000000000000
    Valid From Timestamp  : 1748967501
    Expires At           : 1751559501
    Link Fee             : 22281162232767725
    Native Fee           : 122123345293168
    Market Status        : 2
    ------------------------------------------
    2025-06-03T16:18:21.504537Z  INFO my_data_link_project:
    --- Stream Stats ---
    StatsSnapshot {
       accepted: 2,
       deduplicated: 0,
       total_received: 2,
       partial_reconnects: 0,
       full_reconnects: 0,
       configured_connections: 1,
       active_connections: 1,
    }
    [...]
    

The example above demonstrates streaming data from a single crypto stream. For production environments, especially when subscribing to multiple streams, it's recommended to enable High Availability (HA) mode. This can be achieved by:

  1. Adding multiple WebSocket endpoints in the configuration:

    "wss://ws.testnet-dataengine.chain.link,wss://ws.testnet-dataengine.chain.link"
    
  2. Enabling HA mode in the configuration:

    use chainlink_data_streams_sdk::config::WebSocketHighAvailability;
    // ...
    .with_ws_ha(WebSocketHighAvailability::Enabled)
    

When HA mode is enabled and multiple WebSocket origins are provided, the Stream will maintain concurrent connections to different instances. This ensures high availability, fault tolerance, and minimizes the risk of report gaps.

Decoded report details

The decoded report details include:

AttributeValueDescription
Feed ID0x0004b9905d8337c34e00f8dbe31619428bac5c3937e73e6af75c71780f1770ceThe unique identifier for the feed. In this example, the feed is for EUR/USD.
Observations Timestamp1748967501The timestamp indicating when the data was captured.
Benchmark Price1138050000000000000The observed price in the report, with 18 decimals. For readability: 1.138050000000000000 USD per EUR.
Valid From Timestamp1748967501The start validity timestamp for the report, indicating when the data becomes relevant.
Expires At1751559501The expiration timestamp of the report, indicating the point at which the data becomes outdated.
Link Fee22281162232767725The fee to pay in LINK tokens for the onchain verification of the report data. With 18 decimals. For readability: 0.022281162232767725 LINK. Note: This example fee is not indicative of actual fees.
Native Fee122123345293168The fee to pay in the native blockchain token (e.g., ETH on Ethereum) for the onchain verification of the report data. With 18 decimals. Note: This example fee is not indicative of actual fees.
Market Status2The market status for the feed. In this example, 2 indicates the market is Open.

Payload for onchain verification

In this guide, you log and decode the full_report payload to extract the report data. In a production environment, you should verify the data to ensure its integrity and authenticity. Refer to the Verify report data onchain guide.

Adapting code for different report schema versions

When working with different DataLink providers, you'll need to adapt your code to handle the specific report schema version they use:

  1. Import the correct schema version module. Examples:

    • For v4 schema (as used in this example):
    use chainlink_data_streams_report::report::{ decode_full_report, v4::ReportDataV4 };
    
    • For v3 schema:
    use chainlink_data_streams_report::report::{ decode_full_report, v3::ReportDataV3 };
    
  2. Update the decode function to use the correct schema version. Examples:

    • For v4 schema (as used in this example):
    let report_data = ReportDataV4::decode(&report_blob)?;
    
    • For v3 schema:
    let report_data = ReportDataV3::decode(&report_blob)?;
    
  3. Access fields according to the schema version structure.

Explanation

Establishing a WebSocket connection and listening for reports

The WebSocket connection is established in two steps:

  1. Stream::new initializes a new stream instance with your configuration and feed IDs. This function prepares the connection parameters but doesn't establish the connection yet.

  2. stream.listen() establishes the actual WebSocket connection and starts the background tasks that maintain the connection. These tasks handle:

    • Automatic reconnection if the connection is lost
    • Ping/pong messages to keep the connection alive
    • Message queueing and delivery

Decoding a report

As data reports arrive via the WebSocket connection, they are processed in real-time through several steps:

  1. Reading streams: The read method on the Stream object is called within a loop. This asynchronous method:

    • Awaits the next report from the WebSocket connection
    • Handles backpressure automatically
    • Returns a WebSocketReport containing the report data
  2. Decoding reports: Each report is decoded in two stages:

    • decode_full_report parses the raw hexadecimal data, separating the report context (containing metadata) from the report blob
    • ReportDataV4::decode transforms the report blob into a structured format containing:
      • The benchmark price (with 18 decimal places)
      • Fee information for onchain verification (LINK and native fees)
      • Expiration timestamp
      • Market status indicator

Handling the decoded data

The example demonstrates several best practices for handling the decoded data:

  1. Logging:

    • Uses the tracing crate for structured logging
    • Configures UTC timestamps for consistent time representation
    • Includes both raw report data and decoded fields for debugging
  2. Error handling:

    • Uses Rust's Result type for robust error handling
    • Implements the ? operator for clean error propagation
    • Logs errors with appropriate context using warn! macro
  3. Stream monitoring:

    • Tracks stream statistics through get_stats()
    • Monitors connection status and reconnection attempts
    • Reports message acceptance and deduplication counts

The decoded data can be used for further processing, analysis, or display in your application. For production environments, it's recommended to verify the data onchain using the provided full_report payload.

Get the latest Chainlink content straight to your inbox.