SDK Reference: Consensus & Aggregation
Aggregation is the process of taking many results from individual nodes and reducing them to a single, reliable value. This aggregated value is what the DON reaches consensus on. When you run code on individual nodes using runtime.runInNodeMode(), you must provide an aggregation strategy to tell the DON how to produce this single, trustworthy outcome. This is achieved using a ConsensusAggregation.
ConsensusAggregation<T, U>
This is a generic type passed as the second argument to runtime.runInNodeMode(). It defines the aggregation strategy and an optional default value to be used if the node-level execution fails.
There are two primary ways to specify an aggregation method:
- Using built-in functions: For simple types, use functions like
consensusMedianAggregation(). - Using field-based aggregation: For complex types (objects), use
ConsensusAggregationByFields().
1. Built-in aggregation functions
These functions are used for simple, single-value aggregations.
consensusMedianAggregation<T>()
Computes the median of numeric results from all nodes.
Supported Types (T):
number, bigint, Date
Usage:
import { consensusMedianAggregation, type Runtime, type NodeRuntime } from "@chainlink/cre-sdk"
const fetchPrice = (nodeRuntime: NodeRuntime<Config>): bigint => {
// Fetch price from API
return 100n
}
const onTrigger = (runtime: Runtime<Config>): string => {
const price = runtime.runInNodeMode(fetchPrice, consensusMedianAggregation<bigint>())().result()
runtime.log(`Median price: ${price}`)
return "Success"
}
consensusIdenticalAggregation<T>()
Ensures that a sufficient majority of nodes (a Byzantine Quorum) return the exact same value.
Supported Types (T):
Any primitive TypeScript type (string, boolean, number, bigint), or objects composed entirely of these types.
Usage:
import { consensusIdenticalAggregation, type Runtime, type NodeRuntime } from "@chainlink/cre-sdk"
const fetchBlockHash = (nodeRuntime: NodeRuntime<Config>): string => {
// Fetch block hash from RPC
return "0xabc123..."
}
const onTrigger = (runtime: Runtime<Config>): string => {
const blockHash = runtime.runInNodeMode(fetchBlockHash, consensusIdenticalAggregation<string>())().result()
runtime.log(`Block hash: ${blockHash}`)
return "Success"
}
consensusCommonPrefixAggregation<T>()
Computes the longest common prefix from an array of values from all nodes. This is useful for finding the longest shared sequence at the beginning of a list.
Supported Types (T):
Any array of a type supported by consensusIdenticalAggregation (e.g., string[], number[]).
Usage:
import { consensusCommonPrefixAggregation, type Runtime, type NodeRuntime } from "@chainlink/cre-sdk"
const fetchBlockHeaders = (nodeRuntime: NodeRuntime<Config>): string[] => {
// Fetch block headers for a chain fork
return ["0xabc...", "0xdef...", "0x123..."]
}
const onTrigger = (runtime: Runtime<Config>): string => {
const headers = runtime.runInNodeMode(fetchBlockHeaders, consensusCommonPrefixAggregation<string>())().result()
runtime.log(`Common prefix length: ${headers.length}`)
return "Success"
}
consensusCommonSuffixAggregation<T>()
Computes the longest common suffix from an array of values from all nodes. This is useful for finding the longest shared sequence at the end of a list.
Supported Types (T):
Any array of a type supported by consensusIdenticalAggregation (e.g., string[], number[]).
Usage:
import { consensusCommonSuffixAggregation, type Runtime, type NodeRuntime } from "@chainlink/cre-sdk"
const fetchRecentTransactions = (nodeRuntime: NodeRuntime<Config>): string[] => {
// Fetch recent transaction IDs
return ["0x111...", "0x222...", "0x333..."]
}
const onTrigger = (runtime: Runtime<Config>): string => {
const recentTxs = runtime
.runInNodeMode(fetchRecentTransactions, consensusCommonSuffixAggregation<string>())()
.result()
runtime.log(`Common suffix length: ${recentTxs.length}`)
return "Success"
}
2. Field-based aggregation for objects
For objects with multiple fields, the recommended approach is to use ConsensusAggregationByFields. This function allows you to specify different aggregation strategies for each field of your return type.
ConsensusAggregationByFields<T>(fields)
Creates a consensus aggregation strategy by specifying how to aggregate each field of an object.
Parameters:
fields: An object where each key corresponds to a field in your typeT, and each value is a field aggregation function
Field Aggregation Functions:
| Function | Description | Compatible Field Types |
|---|---|---|
median() | Computes the median of the field's value across all nodes | number, bigint, Date |
identical() | Ensures the field's value is identical across all nodes | Primitives, objects |
commonPrefix() | Finds the longest common prefix for an array from all nodes | Arrays (string[], number[]) |
commonSuffix() | Finds the longest common suffix for an array from all nodes | Arrays |
ignore() | This field will be ignored during consensus | Any |
Usage:
import {
ConsensusAggregationByFields,
median,
identical,
type Runtime,
type HTTPSendRequester,
} from "@chainlink/cre-sdk"
type ReserveInfo = {
lastUpdated: Date
totalReserve: number
source: string
}
const fetchReserveData = (sendRequester: HTTPSendRequester, config: Config): ReserveInfo => {
const response = sendRequester.sendRequest({ url: config.apiUrl }).result()
const data = JSON.parse(response.body.toString())
return {
lastUpdated: new Date(data.timestamp * 1000),
totalReserve: data.reserve,
source: data.source,
}
}
const onTrigger = (runtime: Runtime<Config>): string => {
const httpClient = new cre.capabilities.HTTPClient()
const reserveInfo = httpClient
.sendRequest(
runtime,
fetchReserveData,
ConsensusAggregationByFields<ReserveInfo>({
lastUpdated: median, // Use median for timestamp
totalReserve: median, // Use median for reserve amount
source: identical, // Ensure source is identical across nodes
})
)(runtime.config)
.result()
runtime.log(`Reserve: ${reserveInfo.totalReserve}`)
return "Success"
}
Complete Example
Here's a complete example demonstrating both simple and field-based aggregation:
import {
cre,
Runner,
consensusMedianAggregation,
ConsensusAggregationByFields,
median,
identical,
type Runtime,
type NodeRuntime,
type HTTPSendRequester,
type CronPayload,
} from "@chainlink/cre-sdk"
type Config = {
apiUrl: string
}
type PriceData = {
price: bigint
timestamp: bigint
source: string
}
// Simple aggregation example
const fetchSimplePrice = (nodeRuntime: NodeRuntime<Config>): bigint => {
const httpClient = new cre.capabilities.HTTPClient()
const response = httpClient.sendRequest(nodeRuntime, { url: nodeRuntime.config.apiUrl }).result()
const data = JSON.parse(response.body.toString())
return BigInt(data.price)
}
// Field-based aggregation example
const fetchPriceData = (sendRequester: HTTPSendRequester, config: Config): PriceData => {
const response = sendRequester.sendRequest({ url: config.apiUrl }).result()
const data = JSON.parse(response.body.toString())
return {
price: BigInt(data.price),
timestamp: BigInt(data.timestamp),
source: data.source,
}
}
const onCronTrigger = (runtime: Runtime<Config>, payload: CronPayload): string => {
// Example 1: Simple median aggregation
const simplePrice = runtime.runInNodeMode(fetchSimplePrice, consensusMedianAggregation<bigint>())().result()
runtime.log(`Simple median price: ${simplePrice}`)
// Example 2: Field-based aggregation
const httpClient = new cre.capabilities.HTTPClient()
const priceData = httpClient
.sendRequest(
runtime,
fetchPriceData,
ConsensusAggregationByFields<PriceData>({
price: median, // Median of price values
timestamp: median, // Median of timestamps
source: identical, // Must be identical across nodes
})
)(runtime.config)
.result()
runtime.log(`Aggregated price: ${priceData.price} from ${priceData.source}`)
return "Price data aggregated successfully"
}
const initWorkflow = (config: Config) => {
const cron = new cre.capabilities.CronCapability()
return [cre.handler(cron.trigger({ schedule: "0 */5 * * * *" }), onCronTrigger)]
}
export async function main() {
const runner = await Runner.newRunner<Config>()
await runner.run(initWorkflow)
}
main()
Default Values
You can specify a default value to be used if node-level execution fails:
const price = runtime.runInNodeMode(fetchPrice, consensusMedianAggregation<bigint>().withDefault(0n))().result()
If all nodes fail or consensus cannot be reached, the default value (0n in this example) will be used instead.