SDK Reference: Consensus & Aggregation

Aggregation is the process of taking many results from individual nodes and reducing them to a single, reliable value. This aggregated value is what the DON reaches consensus on. When you run code on individual nodes using runtime.runInNodeMode(), you must provide an aggregation strategy to tell the DON how to produce this single, trustworthy outcome. This is achieved using a ConsensusAggregation.

ConsensusAggregation<T, U>

This is a generic type passed as the second argument to runtime.runInNodeMode(). It defines the aggregation strategy and an optional default value to be used if the node-level execution fails.

There are two primary ways to specify an aggregation method:

  1. Using built-in functions: For simple types, use functions like consensusMedianAggregation().
  2. Using field-based aggregation: For complex types (objects), use ConsensusAggregationByFields().

1. Built-in aggregation functions

These functions are used for simple, single-value aggregations.

consensusMedianAggregation<T>()

Computes the median of numeric results from all nodes.

Supported Types (T): number, bigint, Date

Usage:

import { consensusMedianAggregation, type Runtime, type NodeRuntime } from "@chainlink/cre-sdk"

const fetchPrice = (nodeRuntime: NodeRuntime<Config>): bigint => {
  // Fetch price from API
  return 100n
}

const onTrigger = (runtime: Runtime<Config>): string => {
  const price = runtime.runInNodeMode(fetchPrice, consensusMedianAggregation<bigint>())().result()

  runtime.log(`Median price: ${price}`)
  return "Success"
}

consensusIdenticalAggregation<T>()

Ensures that a sufficient majority of nodes (a Byzantine Quorum) return the exact same value.

Supported Types (T): Any primitive TypeScript type (string, boolean, number, bigint), or objects composed entirely of these types.

Usage:

import { consensusIdenticalAggregation, type Runtime, type NodeRuntime } from "@chainlink/cre-sdk"

const fetchBlockHash = (nodeRuntime: NodeRuntime<Config>): string => {
  // Fetch block hash from RPC
  return "0xabc123..."
}

const onTrigger = (runtime: Runtime<Config>): string => {
  const blockHash = runtime.runInNodeMode(fetchBlockHash, consensusIdenticalAggregation<string>())().result()

  runtime.log(`Block hash: ${blockHash}`)
  return "Success"
}

consensusCommonPrefixAggregation<T>()

Computes the longest common prefix from an array of values from all nodes. This is useful for finding the longest shared sequence at the beginning of a list.

Supported Types (T): Any array of a type supported by consensusIdenticalAggregation (e.g., string[], number[]).

Usage:

import { consensusCommonPrefixAggregation, type Runtime, type NodeRuntime } from "@chainlink/cre-sdk"

const fetchBlockHeaders = (nodeRuntime: NodeRuntime<Config>): string[] => {
  // Fetch block headers for a chain fork
  return ["0xabc...", "0xdef...", "0x123..."]
}

const onTrigger = (runtime: Runtime<Config>): string => {
  const headers = runtime.runInNodeMode(fetchBlockHeaders, consensusCommonPrefixAggregation<string>())().result()

  runtime.log(`Common prefix length: ${headers.length}`)
  return "Success"
}

consensusCommonSuffixAggregation<T>()

Computes the longest common suffix from an array of values from all nodes. This is useful for finding the longest shared sequence at the end of a list.

Supported Types (T): Any array of a type supported by consensusIdenticalAggregation (e.g., string[], number[]).

Usage:

import { consensusCommonSuffixAggregation, type Runtime, type NodeRuntime } from "@chainlink/cre-sdk"

const fetchRecentTransactions = (nodeRuntime: NodeRuntime<Config>): string[] => {
  // Fetch recent transaction IDs
  return ["0x111...", "0x222...", "0x333..."]
}

const onTrigger = (runtime: Runtime<Config>): string => {
  const recentTxs = runtime
    .runInNodeMode(fetchRecentTransactions, consensusCommonSuffixAggregation<string>())()
    .result()

  runtime.log(`Common suffix length: ${recentTxs.length}`)
  return "Success"
}

2. Field-based aggregation for objects

For objects with multiple fields, the recommended approach is to use ConsensusAggregationByFields. This function allows you to specify different aggregation strategies for each field of your return type.

ConsensusAggregationByFields<T>(fields)

Creates a consensus aggregation strategy by specifying how to aggregate each field of an object.

Parameters:

  • fields: An object where each key corresponds to a field in your type T, and each value is a field aggregation function

Field Aggregation Functions:

FunctionDescriptionCompatible Field Types
median()Computes the median of the field's value across all nodesnumber, bigint, Date
identical()Ensures the field's value is identical across all nodesPrimitives, objects
commonPrefix()Finds the longest common prefix for an array from all nodesArrays (string[], number[])
commonSuffix()Finds the longest common suffix for an array from all nodesArrays
ignore()This field will be ignored during consensusAny

Usage:

import {
  ConsensusAggregationByFields,
  median,
  identical,
  type Runtime,
  type HTTPSendRequester,
} from "@chainlink/cre-sdk"

type ReserveInfo = {
  lastUpdated: Date
  totalReserve: number
  source: string
}

const fetchReserveData = (sendRequester: HTTPSendRequester, config: Config): ReserveInfo => {
  const response = sendRequester.sendRequest({ url: config.apiUrl }).result()
  const data = JSON.parse(response.body.toString())

  return {
    lastUpdated: new Date(data.timestamp * 1000),
    totalReserve: data.reserve,
    source: data.source,
  }
}

const onTrigger = (runtime: Runtime<Config>): string => {
  const httpClient = new cre.capabilities.HTTPClient()

  const reserveInfo = httpClient
    .sendRequest(
      runtime,
      fetchReserveData,
      ConsensusAggregationByFields<ReserveInfo>({
        lastUpdated: median, // Use median for timestamp
        totalReserve: median, // Use median for reserve amount
        source: identical, // Ensure source is identical across nodes
      })
    )(runtime.config)
    .result()

  runtime.log(`Reserve: ${reserveInfo.totalReserve}`)
  return "Success"
}

Complete Example

Here's a complete example demonstrating both simple and field-based aggregation:

import {
  cre,
  Runner,
  consensusMedianAggregation,
  ConsensusAggregationByFields,
  median,
  identical,
  type Runtime,
  type NodeRuntime,
  type HTTPSendRequester,
  type CronPayload,
} from "@chainlink/cre-sdk"

type Config = {
  apiUrl: string
}

type PriceData = {
  price: bigint
  timestamp: bigint
  source: string
}

// Simple aggregation example
const fetchSimplePrice = (nodeRuntime: NodeRuntime<Config>): bigint => {
  const httpClient = new cre.capabilities.HTTPClient()
  const response = httpClient.sendRequest(nodeRuntime, { url: nodeRuntime.config.apiUrl }).result()
  const data = JSON.parse(response.body.toString())
  return BigInt(data.price)
}

// Field-based aggregation example
const fetchPriceData = (sendRequester: HTTPSendRequester, config: Config): PriceData => {
  const response = sendRequester.sendRequest({ url: config.apiUrl }).result()
  const data = JSON.parse(response.body.toString())

  return {
    price: BigInt(data.price),
    timestamp: BigInt(data.timestamp),
    source: data.source,
  }
}

const onCronTrigger = (runtime: Runtime<Config>, payload: CronPayload): string => {
  // Example 1: Simple median aggregation
  const simplePrice = runtime.runInNodeMode(fetchSimplePrice, consensusMedianAggregation<bigint>())().result()

  runtime.log(`Simple median price: ${simplePrice}`)

  // Example 2: Field-based aggregation
  const httpClient = new cre.capabilities.HTTPClient()
  const priceData = httpClient
    .sendRequest(
      runtime,
      fetchPriceData,
      ConsensusAggregationByFields<PriceData>({
        price: median, // Median of price values
        timestamp: median, // Median of timestamps
        source: identical, // Must be identical across nodes
      })
    )(runtime.config)
    .result()

  runtime.log(`Aggregated price: ${priceData.price} from ${priceData.source}`)

  return "Price data aggregated successfully"
}

const initWorkflow = (config: Config) => {
  const cron = new cre.capabilities.CronCapability()

  return [cre.handler(cron.trigger({ schedule: "0 */5 * * * *" }), onCronTrigger)]
}

export async function main() {
  const runner = await Runner.newRunner<Config>()
  await runner.run(initWorkflow)
}

main()

Default Values

You can specify a default value to be used if node-level execution fails:

const price = runtime.runInNodeMode(fetchPrice, consensusMedianAggregation<bigint>().withDefault(0n))().result()

If all nodes fail or consensus cannot be reached, the default value (0n in this example) will be used instead.

Get the latest Chainlink content straight to your inbox.