Skip to content

ProjectionEngine

Fact aggregation system for building materialized views from event streams.

Prerequisites: yaml-manifests.md


The ProjectionEngine processes facts and generates aggregations based on declarative YAML configuration. It transforms raw event streams into queryable metrics, supporting:

  • 6 aggregation functions - count, sum, avg, min, max, count_distinct
  • Time windows - bucket by minute, hour, day, or month
  • Multi-dimensional analysis - groupBy multiple fields
  • Materialization - store results in CachedState for instant retrieval

ProjectionEngine is ideal for:

  • Analytics dashboards - page views, sessions, conversions
  • Usage tracking - API calls, compute minutes, storage bytes
  • Business metrics - revenue, orders, active users
  • Operational monitoring - error rates, latency percentiles
import { parseProjectionConfig, ProjectionEngine } from '@z0-app/sdk';
const yaml = `
source: api_request
factTypes: [request_completed]
timeWindow: hour
groupBy: [endpoint, status]
aggregations:
- field: count
function: count
- field: data.duration_ms
function: avg
materialize: true
`;
const config = parseProjectionConfig(yaml);
const engine = new ProjectionEngine(config);
const results = engine.process(facts);
// => { "2026-03-12T10:US:/api/v1:200": { count: 42, duration_ms_avg: 120.5 } }

ProjectionEngine uses YAML configuration parsed via parseProjectionConfig() or can be constructed from a versioned Config<ProjectionConfigSettings> object for full z0 Config pattern integration.

function parseProjectionConfig(yaml: string): ProjectionConfig
interface ProjectionConfig {
source: string; // Source entity type
factTypes: string[]; // Fact types to include
timeWindow?: TimeWindow; // Optional time bucketing
groupBy?: string[]; // Optional grouping fields
aggregations: Aggregation[]; // Aggregations to compute
materialize: boolean; // Store in CachedState?
storageVersion?: number; // Storage format version (1=blob, 2=sparse)
}
interface ProjectionConfigSettings {
source: string; // Source entity type
factTypes: string[]; // Fact types to include
timeWindow?: TimeWindow; // Optional time bucketing
groupBy?: string[]; // Optional grouping fields
aggregations: Aggregation[]; // Aggregations to compute
materialize: boolean; // Store in CachedState?
storageVersion: number; // Storage format version (1=blob, 2=sparse)
rollup?: RollupConfig; // Optional auto-rollup configuration
}
type TimeWindow = 'minute' | 'hour' | 'day' | 'month';
interface Aggregation {
field: string; // Field to aggregate (use 'count' for count function)
function: AggregationFunction;
}
type AggregationFunction = 'count' | 'sum' | 'avg' | 'min' | 'max' | 'count_distinct';
import {
parseProjectionConfig,
ConfigParseError,
ConfigValidationError
} from '@z0-app/sdk';
try {
const config = parseProjectionConfig(yamlString);
} catch (err) {
if (err instanceof ConfigParseError) {
console.error('Invalid YAML syntax:', err.message);
} else if (err instanceof ConfigValidationError) {
console.error('Config validation failed:', err.message);
console.error('Issues:', err.issues); // Array of specific errors
}
}

ProjectionEngine supports two storage formats controlled by the storageVersion field:

  • storageVersion: 1 (default, blob format) - All buckets stored as single JSON blob
  • storageVersion: 2 (sparse format) - Each bucket stored as separate key (recommended for large datasets)

Sparse storage benefits:

  • Efficient time-range queries - Load only buckets in range, not entire dataset
  • Incremental updates - Update single bucket without reading/writing all data
  • Lower memory usage - Process one bucket at a time
  • Better for high-cardinality groupBy - Scale to millions of buckets

When using sparse storage (storageVersion: 2), buckets are stored with this key pattern:

{projectionKey}:{timeWindow}:{bucketKey}
{projectionKey}:{timeWindow}:{bucketKey}:{groupBy[0]}:{groupBy[1]}:...

Examples:

// Time window only
"daily_usage:day:2026-03-12"
// Time window + single groupBy
"api_metrics:hour:2026-03-12T10:US"
// Time window + multiple groupBy
"orders:day:2026-03-12:product_category=electronics:payment_method=credit"
source: api_request
factTypes: [completed]
timeWindow: day
groupBy: [data.country]
aggregations:
- field: count
function: count
materialize: true
storageVersion: 2 # Enable sparse storage
const config = parseProjectionConfig(yaml);
const engine = new ProjectionEngine(config);
// Each day+country bucket stored separately
const result = engine.process(facts, {
cachedStateManager,
projectionKey: 'api_usage',
});
// Storage keys created:
// - api_usage:day:2026-03-12:US
// - api_usage:day:2026-03-12:UK
// - api_usage:day:2026-03-13:US

When storageVersion changes in config, ProjectionEngine automatically migrates data and emits migration facts for audit trail:

import { PROJECTION_FACT_TYPES } from '@z0-app/sdk';
// Facts emitted during migration:
// - projection.migration_started
// - projection.migration_completed (with duration and bucket count)
// - projection.migration_failed (on error, original data intact)

Migration is triggered when:

  1. Config storageVersion changes (e.g., 1 → 2)
  2. Engine processes facts with new config version
  3. CachedStateManager detects version mismatch

Migration safety:

  • Original data preserved until migration succeeds
  • Atomic switch to new format
  • Automatic rollback on failure

ProjectionEngine supports six aggregation functions:

Count the number of facts in each bucket.

aggregations:
- field: count
function: count

Result key: count

Example:

const config = parseProjectionConfig(`
source: pageview
factTypes: [viewed]
aggregations:
- field: count
function: count
`);
const engine = new ProjectionEngine(config);
const result = engine.process(facts);
// => { count: 42 }

Sum numeric values across facts.

aggregations:
- field: data.amount
function: sum

Result key: {field}_sum (e.g., amount_sum)

Example:

// Facts: [{ data: { amount: 100 } }, { data: { amount: 250 } }]
// Result: { amount_sum: 350 }

Calculate the mean of numeric values.

aggregations:
- field: data.duration_ms
function: avg

Result key: {field}_avg (e.g., duration_ms_avg)

Example:

// Facts: [{ data: { duration_ms: 100 } }, { data: { duration_ms: 200 } }]
// Result: { duration_ms_avg: 150 }

Find the minimum numeric value.

aggregations:
- field: data.response_time
function: min

Result key: {field}_min (e.g., response_time_min)

Example:

// Facts: [{ data: { response_time: 50 } }, { data: { response_time: 120 } }]
// Result: { response_time_min: 50 }

Find the maximum numeric value.

aggregations:
- field: data.response_time
function: max

Result key: {field}_max (e.g., response_time_max)

Example:

// Facts: [{ data: { response_time: 50 } }, { data: { response_time: 120 } }]
// Result: { response_time_max: 120 }

Count unique values (uses Set internally).

aggregations:
- field: data.user_id
function: count_distinct

Result key: {field}_distinct (e.g., user_id_distinct)

Example:

// Facts: [
// { data: { user_id: 'u1' } },
// { data: { user_id: 'u1' } },
// { data: { user_id: 'u2' } }
// ]
// Result: { user_id_distinct: 2 }

Time windows bucket facts by timestamp for temporal analysis.

WindowFormatExample
minuteYYYY-MM-DDTHH:mm2026-03-12T10:15
hourYYYY-MM-DDTHH2026-03-12T10
dayYYYY-MM-DD2026-03-12
monthYYYY-MM2026-03

All timestamps use UTC.

source: pageview
factTypes: [viewed]
timeWindow: hour # Bucket by hour
aggregations:
- field: count
function: count
const config = parseProjectionConfig(`
source: pageview
factTypes: [viewed]
timeWindow: hour
aggregations:
- field: count
function: count
`);
const facts = [
{ type: 'viewed', timestamp: Date.UTC(2026, 2, 12, 10, 15), data: {} },
{ type: 'viewed', timestamp: Date.UTC(2026, 2, 12, 10, 45), data: {} },
{ type: 'viewed', timestamp: Date.UTC(2026, 2, 12, 11, 5), data: {} },
];
const engine = new ProjectionEngine(config);
const result = engine.process(facts);
console.log(result);
// Output:
// {
// "2026-03-12T10": { count: 2 },
// "2026-03-12T11": { count: 1 }
// }

Multi-Dimensional Aggregation with groupBy

Section titled “Multi-Dimensional Aggregation with groupBy”

The groupBy field enables dimensional analysis by grouping facts by one or more fields.

source: api_request
factTypes: [completed]
groupBy: [data.endpoint, data.status]
aggregations:
- field: count
function: count
- field: data.duration_ms
function: avg

Example: API Metrics by Endpoint and Status

Section titled “Example: API Metrics by Endpoint and Status”
const config = parseProjectionConfig(`
source: api_request
factTypes: [completed]
groupBy: [data.endpoint, data.status]
aggregations:
- field: count
function: count
- field: data.duration_ms
function: avg
`);
const facts = [
{ type: 'completed', data: { endpoint: '/api/v1', status: 200, duration_ms: 100 } },
{ type: 'completed', data: { endpoint: '/api/v1', status: 200, duration_ms: 150 } },
{ type: 'completed', data: { endpoint: '/api/v1', status: 500, duration_ms: 2000 } },
{ type: 'completed', data: { endpoint: '/api/v2', status: 200, duration_ms: 80 } },
];
const engine = new ProjectionEngine(config);
const result = engine.process(facts);
console.log(result);
// Output:
// {
// "/api/v1:200": { count: 2, duration_ms_avg: 125 },
// "/api/v1:500": { count: 1, duration_ms_avg: 2000 },
// "/api/v2:200": { count: 1, duration_ms_avg: 80 }
// }
source: api_request
factTypes: [completed]
timeWindow: day
groupBy: [data.country]
aggregations:
- field: count
function: count

Bucket key format: {timeWindow}:{groupBy[0]}:{groupBy[1]}:...

Example result:

{
"2026-03-12:US": { count: 42 },
"2026-03-12:UK": { count: 18 },
"2026-03-13:US": { count: 55 }
}

When materialize: true, ProjectionEngine stores aggregation results in CachedState for instant retrieval.

  • Fast queries - Pre-computed results, no fact scanning
  • Incremental updates - Process only new facts
  • Consistency - Tracks factsThroughId to detect gaps
source: usage
factTypes: [api_call, compute_used]
timeWindow: day
aggregations:
- field: count
function: count
materialize: true # Store in CachedState
import { ProjectionEngine, parseProjectionConfig } from '@z0-app/sdk';
import type { CachedStateManager } from '@z0-app/sdk';
const config = parseProjectionConfig(`
source: usage
factTypes: [api_call]
timeWindow: day
aggregations:
- field: count
function: count
materialize: true
`);
const engine = new ProjectionEngine(config);
// Process facts and materialize
const result = engine.process(facts, {
cachedStateManager, // CachedStateManager instance
projectionKey: 'daily_usage', // Key for storing in CachedState
factsThroughId: 'fact_xyz', // Latest fact ID processed
});
// Later, retrieve from CachedState without recomputing
const cached = cachedStateManager.get('daily_usage');
console.log(cached);
// => {
// value: { "2026-03-12": { count: 42 } },
// factsThroughId: "fact_xyz"
// }

When new facts arrive, only process facts after factsThroughId:

// Initial computation
const result1 = engine.process(facts, {
cachedStateManager,
projectionKey: 'daily_usage',
factsThroughId: 'fact_100',
});
// Later, new facts arrive (fact_101, fact_102, ...)
const newFacts = facts.filter(f => f.id > 'fact_100');
const result2 = engine.process(newFacts, {
cachedStateManager,
projectionKey: 'daily_usage',
factsThroughId: 'fact_150',
});
// CachedState now updated with new aggregations

Use dot notation to access nested fields in fact data:

const config = parseProjectionConfig(`
source: order
factTypes: [placed]
aggregations:
- field: data.payment.amount
function: sum
- field: data.customer.country
function: count_distinct
`);
// Facts:
// [
// { type: 'placed', data: { payment: { amount: 100 }, customer: { country: 'US' } } },
// { type: 'placed', data: { payment: { amount: 250 }, customer: { country: 'UK' } } }
// ]
//
// Result:
// {
// payment_amount_sum: 350,
// customer_country_distinct: 2
// }

Track page views and engagement by hour:

source: pageview
factTypes: [viewed, engaged]
timeWindow: hour
groupBy: [data.page_type]
aggregations:
- field: count
function: count
- field: data.duration_ms
function: avg
materialize: true
const config = parseProjectionConfig(yaml);
const engine = new ProjectionEngine(config);
const facts = [
{ type: 'viewed', timestamp: Date.UTC(2026, 2, 12, 10, 15), data: { page_type: 'homepage', duration_ms: 5000 } },
{ type: 'viewed', timestamp: Date.UTC(2026, 2, 12, 10, 30), data: { page_type: 'homepage', duration_ms: 3000 } },
{ type: 'engaged', timestamp: Date.UTC(2026, 2, 12, 10, 45), data: { page_type: 'product', duration_ms: 12000 } },
];
const result = engine.process(facts, {
cachedStateManager,
projectionKey: 'hourly_analytics',
factsThroughId: facts[facts.length - 1].id,
});
console.log(result);
// {
// "2026-03-12T10:homepage": { count: 2, duration_ms_avg: 4000 },
// "2026-03-12T10:product": { count: 1, duration_ms_avg: 12000 }
// }

Calculate daily usage per customer:

source: usage
factTypes: [api_call, compute_used, storage_used]
timeWindow: day
groupBy: [data.customer_id]
aggregations:
- field: count
function: count
- field: data.compute_ms
function: sum
- field: data.storage_bytes
function: max
materialize: true
const config = parseProjectionConfig(yaml);
const engine = new ProjectionEngine(config);
const facts = [
{ type: 'api_call', timestamp: Date.UTC(2026, 2, 12, 8, 0), data: { customer_id: 'c1', compute_ms: 100 } },
{ type: 'api_call', timestamp: Date.UTC(2026, 2, 12, 14, 0), data: { customer_id: 'c1', compute_ms: 200 } },
{ type: 'storage_used', timestamp: Date.UTC(2026, 2, 12, 20, 0), data: { customer_id: 'c1', storage_bytes: 1024 } },
];
const result = engine.process(facts, {
cachedStateManager,
projectionKey: 'daily_usage',
factsThroughId: facts[facts.length - 1].id,
});
console.log(result);
// {
// "2026-03-12:c1": {
// count: 3,
// compute_ms_sum: 300,
// storage_bytes_max: 1024
// }
// }

Track errors by endpoint and calculate percentages:

source: api_request
factTypes: [completed]
timeWindow: minute
groupBy: [data.endpoint, data.status_category]
aggregations:
- field: count
function: count
- field: data.duration_ms
function: avg
- field: data.duration_ms
function: max
materialize: false
const config = parseProjectionConfig(yaml);
const engine = new ProjectionEngine(config);
const facts = [
{ type: 'completed', timestamp: Date.UTC(2026, 2, 12, 10, 15, 5), data: { endpoint: '/api/users', status_category: '2xx', duration_ms: 50 } },
{ type: 'completed', timestamp: Date.UTC(2026, 2, 12, 10, 15, 10), data: { endpoint: '/api/users', status_category: '2xx', duration_ms: 75 } },
{ type: 'completed', timestamp: Date.UTC(2026, 2, 12, 10, 15, 20), data: { endpoint: '/api/users', status_category: '5xx', duration_ms: 2000 } },
];
const result = engine.process(facts);
console.log(result);
// {
// "2026-03-12T10:15:/api/users:2xx": { count: 2, duration_ms_avg: 62.5, duration_ms_max: 75 },
// "2026-03-12T10:15:/api/users:5xx": { count: 1, duration_ms_avg: 2000, duration_ms_max: 2000 }
// }
// Calculate error rate
const success = result["2026-03-12T10:15:/api/users:2xx"].count;
const error = result["2026-03-12T10:15:/api/users:5xx"].count;
const errorRate = error / (success + error);
console.log(`Error rate: ${(errorRate * 100).toFixed(2)}%`); // "33.33%"

Query specific time ranges without loading the entire projection dataset.

engine.query<T>(
projectionKey: string,
options?: TimeRangeQueryOptions
): Promise<Record<string, T>>
interface TimeRangeQueryOptions {
from?: number; // Start timestamp (inclusive)
to?: number; // End timestamp (inclusive)
aggregate?: boolean; // Aggregate all buckets into single result
mode?: QueryMode; // 'materialized' (default) or 'compute'
}

Performance: O(k) where k = number of buckets in range (not O(n) total buckets)

Query specific date range:

const engine = new ProjectionEngine(config);
// Get March 12-14 data only
const result = await engine.query('daily_usage', {
from: Date.UTC(2026, 2, 12, 0, 0, 0),
to: Date.UTC(2026, 2, 14, 23, 59, 59),
});
// Returns only buckets in range:
// {
// "2026-03-12:US": { count: 42 },
// "2026-03-13:US": { count: 55 },
// "2026-03-14:US": { count: 38 }
// }

Open-ended ranges:

// All data from March 12 onwards
const fromMarch = await engine.query('daily_usage', {
from: Date.UTC(2026, 2, 12),
});
// All data up to March 14
const throughMarch = await engine.query('daily_usage', {
to: Date.UTC(2026, 2, 14, 23, 59, 59),
});

Aggregate across range:

// Single aggregated result for entire week
const weekTotal = await engine.query('daily_usage', {
from: Date.UTC(2026, 2, 10),
to: Date.UTC(2026, 2, 16, 23, 59, 59),
aggregate: true,
});
// Returns:
// {
// "aggregated": {
// count: 300, // sum of all counts
// duration_ms_avg: 125.5 // weighted average
// }
// }

ProjectionEngine supports two query modes for different use cases:

Reads from stored CachedState. Fast but requires prior materialization.

const result = await engine.query('daily_usage', {
mode: 'materialized', // default
});

Use when:

  • Data is already materialized
  • Query performance is critical
  • You want to read pre-computed results

Computes directly from source facts without touching storage. No side effects.

const result = await engine.query('daily_usage', {
from: Date.UTC(2026, 2, 12),
to: Date.UTC(2026, 2, 14),
mode: 'compute',
});

Use when:

  • Running ad-hoc queries on historical data
  • Testing projections before materializing
  • You don’t want to modify cached state
  • Debugging or validation

Characteristics:

  • No reads from CachedState
  • No writes to CachedState
  • Computes aggregations on-demand from facts
  • Can be combined with time range filtering

Process individual facts with O(1) complexity instead of reprocessing all data.

engine.processIncremental(
fact: Fact,
options?: ProjectionProcessOptions
): Promise<void>

Benefits:

  • O(1) complexity - Updates only affected bucket
  • Idempotent - Safe to process same fact multiple times
  • Real-time updates - Process facts as they arrive
  • Correct aggregations - Maintains running sums/counts for avg, count_distinct
const config = parseProjectionConfig(`
source: api_request
factTypes: [completed]
timeWindow: hour
aggregations:
- field: count
function: count
- field: data.duration_ms
function: avg
materialize: true
storageVersion: 2
`);
const engine = new ProjectionEngine(config);
// Process individual fact incrementally
await engine.processIncremental(
{
id: 'fact_123',
type: 'completed',
timestamp: Date.UTC(2026, 2, 12, 10, 15),
data: { duration_ms: 150 },
},
{
cachedStateManager,
projectionKey: 'api_metrics',
}
);
// Only the 2026-03-12T10 bucket is read, updated, and written
// No need to load or reprocess other hours

ProcessIncremental tracks processed fact IDs to prevent double-counting:

// Process same fact twice - second call is no-op
await engine.processIncremental(fact, options);
await engine.processIncremental(fact, options); // No effect

Late-arriving facts update historical buckets correctly:

// Process facts out of order
await engine.processIncremental(factFromMarch14, options); // Creates Mar 14 bucket
await engine.processIncremental(factFromMarch12, options); // Updates Mar 12 bucket
await engine.processIncremental(factFromMarch13, options); // Creates Mar 13 bucket
// Each bucket maintained independently

Automatically aggregate fine-grained buckets into coarser time windows (minute → hour → day → month).

source: api_request
factTypes: [completed]
timeWindow: minute
aggregations:
- field: count
function: count
materialize: true
rollup:
enabled: true
retention:
minute: 7 # Keep 7 days of minute buckets
hour: 90 # Keep 90 days of hour buckets
day: 730 # Keep 2 years of day buckets
month: 0 # Keep month buckets forever
minute (60 buckets) → hour (24 buckets) → day (28-31 buckets) → month

Rollup triggers:

  • Minute → Hour: After 60 minutes of data in an hour
  • Hour → Day: At day boundary (24 hours complete)
  • Day → Month: At month boundary (all days in month complete)
FunctionRollup Method
countSum child counts
sumSum child sums
avgWeighted average (using counts)
minMin of child mins
maxMax of child maxes
count_distinctHLL approximation or disabled*

*count_distinct either uses HyperLogLog for approximate rollup or is skipped with documented limitation.

After rollup, fine-grained buckets older than retention period are deleted:

// Configuration with 7-day minute retention
rollup:
enabled: true
retention:
minute: 7 // After 7 days, minute buckets rolled up and deleted
// Timeline:
// Day 0-7: Minute buckets + hour buckets exist
// Day 8+: Only hour buckets (minute buckets deleted after rollup)

Retention value of 0 means keep forever (no cleanup).

const config = parseProjectionConfig(`
source: pageview
factTypes: [viewed]
timeWindow: minute
aggregations:
- field: count
function: count
- field: data.duration_ms
function: avg
materialize: true
storageVersion: 2
rollup:
enabled: true
retention:
minute: 1
hour: 7
day: 90
`);
const engine = new ProjectionEngine(config);
// Process a week's worth of facts at minute granularity
await engine.process(weekOfFacts, {
cachedStateManager,
projectionKey: 'pageviews',
});
// Storage state after rollup:
// - Last 24 hours: minute + hour buckets
// - Last 7 days: hour + day buckets
// - Last 90 days: day buckets
// - Older: month buckets

  • minute - Real-time monitoring (heavy write load)
  • hour - Operational dashboards
  • day - Usage metering, billing
  • month - Business analytics, long-term trends

Each unique combination creates a bucket. Avoid high-cardinality fields:

# ❌ BAD - Creates millions of buckets
groupBy: [data.user_id, data.session_id]
# ✅ GOOD - Bounded cardinality
groupBy: [data.country, data.plan_type]

If computing the projection is expensive (many facts, complex aggregations), set materialize: true.

For materialized projections, only process new facts:

// Track last processed fact ID
const lastFactId = cachedStateManager.get('daily_usage')?.factsThroughId;
// Get new facts only
const newFacts = facts.filter(f => f.id > lastFactId);
// Incremental update
engine.process(newFacts, {
cachedStateManager,
projectionKey: 'daily_usage',
factsThroughId: newFacts[newFacts.length - 1].id,
});
import { parseProjectionConfig, ConfigValidationError } from '@z0-app/sdk';
const projectionYamls = ['daily_usage.yaml', 'error_rates.yaml'];
for (const yamlFile of projectionYamls) {
try {
const yaml = fs.readFileSync(yamlFile, 'utf-8');
parseProjectionConfig(yaml); // Validate
} catch (err) {
if (err instanceof ConfigValidationError) {
console.error(`Invalid projection config in ${yamlFile}:`);
err.issues.forEach(issue => console.error(` - ${issue}`));
process.exit(1);
}
}
}

Wrap ProjectionConfig in z0’s Config pattern for versioned configuration:

import type { Config } from '@z0-app/sdk';
import { ProjectionConfigSettings, ProjectionEngine } from '@z0-app/sdk';
// Store as versioned config
const projectionConfig: Config<ProjectionConfigSettings> = {
id: 'proj_daily_usage',
type: 'projection',
scope: 'platform',
version: 1,
tenant_id: 'system',
settings: {
source: 'api_request',
factTypes: ['completed'],
timeWindow: 'day',
aggregations: [{ field: 'count', function: 'count' }],
materialize: true,
storageVersion: 2,
},
effective_at: Date.now(),
};
// Construct engine from Config
const engine = new ProjectionEngine(projectionConfig);
// Config version tracked in metadata
const result = engine.process(facts, options);
// result._meta.config_version === 1

Benefits:

  • Audit trail of configuration changes
  • Rollback to previous versions
  • Track which config version produced which results

Enable storageVersion: 2 when:

  • Dataset has many time buckets (months/years of data)
  • High-cardinality groupBy fields
  • Need efficient time-range queries
  • Memory constraints during processing
storageVersion: 2 # Sparse storage

Track projection migrations using PROJECTION_FACT_TYPES:

import { PROJECTION_FACT_TYPES } from '@z0-app/sdk';
// Listen for migration events
factManager.on('fact', (fact) => {
if (fact.type === PROJECTION_FACT_TYPES.MIGRATION_STARTED) {
console.log('Migration started:', fact.data);
}
if (fact.type === PROJECTION_FACT_TYPES.MIGRATION_COMPLETED) {
console.log('Migration completed in', fact.data.duration_ms, 'ms');
}
});

FeatureDescriptionExample
Aggregation Functionscount, sum, avg, min, max, count_distinct{ field: 'data.amount', function: 'sum' }
Time Windowsminute, hour, day, monthtimeWindow: hour
Multi-DimensionalgroupBy multiple fieldsgroupBy: [data.country, data.plan]
Sparse Storage (v0.10.0+)Each bucket as separate keystorageVersion: 2
Time Range Queries (v0.10.0+)Query specific date rangesquery(key, { from, to })
Incremental Processing (v0.10.0+)O(1) per-fact updatesprocessIncremental(fact)
Query Modes (v0.10.0+)Materialized or computemode: 'compute'
Auto-Rollups (v0.10.0+)Minute→hour→day→monthrollup: { enabled: true }
MaterializationStore in CachedStatematerialize: true
Config Pattern (v0.10.0+)Versioned configurationConfig<ProjectionConfigSettings>
Migration Facts (v0.10.0+)Audit trail for changesPROJECTION_FACT_TYPES
Nested FieldsDot notation accessfield: 'data.payment.amount'

ProjectionEngine transforms raw event streams into queryable metrics using declarative YAML configuration. v0.10.0 adds sparse storage for efficiency, incremental processing for real-time updates, time range queries for targeted analysis, and auto-rollups for long-term data retention.