Skip to content

How to backfill offline store with UDF-transformed data for StreamFeatureView? #6281

@cutoutsy

Description

@cutoutsy

Background

I'm using StreamFeatureView with KafkaSource + local provider. The real-time ingestion path works well — StreamProcessor.ingest_stream_feature_view(to=ONLINE_AND_OFFLINE) consumes from Kafka, applies the pandas UDF, and writes transformed features to both online and offline stores.

Now I need to backfill historical data that predates the streaming pipeline. I have raw Kafka history available via ETL export, but I'm not sure what the recommended approach is to get UDF-transformed features into the offline store.

My setup

@stream_feature_view(
    source=kafka_source,   # KafkaSource with batch_source=FileSource(parquet)
    mode="pandas",
    udf=my_udf,            # computes derived features (e.g. aggregations, flags) from raw fields
)
def my_features(df): ...

What I've tried

1. ETL export + materialize()

Export raw Kafka history to batch_source parquet, then call store.materialize(start, end).

This reads the batch_source, applies the UDF, and writes to the online store — but the offline store is not updated. After materialize(), calling get_historical_features() still returns raw data without the UDF-computed derived features.

2. Re-ingest via StreamProcessor from Kafka

Use StreamProcessor.ingest(to=ONLINE_AND_OFFLINE) with offset reset to consume historical messages.

This only works within Kafka's retention window and doesn't scale for large history.

Questions

  1. Is there a recommended way to backfill the offline store with UDF-transformed data? I want the UDF defined on the StreamFeatureView to be the single source of truth, avoiding duplication in ETL scripts.

  2. Does materialize() intentionally skip the offline store? If so, what is the expected workflow for keeping the offline store in sync with UDF-transformed features after a historical backfill?

  3. Batch_source and offline store are the same file — For local/file-based offline stores, materialize() reads from the batch_source parquet, which is the same file that write_to_offline_store() writes to. Even if materialize() were extended to write back to the offline store, this would create problems:

    • UDF applied twice on subsequent runs (transformed rows re-read and re-transformed)
    • Data duplication (raw + transformed rows coexist)
    • Idempotency broken (each run appends more duplicates)

    Is separating the batch_source (raw input) from the offline store (transformed output) something that has been considered? Or is there an existing pattern I'm missing?

Any guidance or pointers would be greatly appreciated! I may well be missing something in the existing workflow.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions