Background
I'm using StreamFeatureView with KafkaSource + local provider. The real-time ingestion path works well — StreamProcessor.ingest_stream_feature_view(to=ONLINE_AND_OFFLINE) consumes from Kafka, applies the pandas UDF, and writes transformed features to both online and offline stores.
Now I need to backfill historical data that predates the streaming pipeline. I have raw Kafka history available via ETL export, but I'm not sure what the recommended approach is to get UDF-transformed features into the offline store.
My setup
@stream_feature_view(
source=kafka_source, # KafkaSource with batch_source=FileSource(parquet)
mode="pandas",
udf=my_udf, # computes derived features (e.g. aggregations, flags) from raw fields
)
def my_features(df): ...
What I've tried
1. ETL export + materialize()
Export raw Kafka history to batch_source parquet, then call store.materialize(start, end).
This reads the batch_source, applies the UDF, and writes to the online store — but the offline store is not updated. After materialize(), calling get_historical_features() still returns raw data without the UDF-computed derived features.
2. Re-ingest via StreamProcessor from Kafka
Use StreamProcessor.ingest(to=ONLINE_AND_OFFLINE) with offset reset to consume historical messages.
This only works within Kafka's retention window and doesn't scale for large history.
Questions
-
Is there a recommended way to backfill the offline store with UDF-transformed data? I want the UDF defined on the StreamFeatureView to be the single source of truth, avoiding duplication in ETL scripts.
-
Does materialize() intentionally skip the offline store? If so, what is the expected workflow for keeping the offline store in sync with UDF-transformed features after a historical backfill?
-
Batch_source and offline store are the same file — For local/file-based offline stores, materialize() reads from the batch_source parquet, which is the same file that write_to_offline_store() writes to. Even if materialize() were extended to write back to the offline store, this would create problems:
- UDF applied twice on subsequent runs (transformed rows re-read and re-transformed)
- Data duplication (raw + transformed rows coexist)
- Idempotency broken (each run appends more duplicates)
Is separating the batch_source (raw input) from the offline store (transformed output) something that has been considered? Or is there an existing pattern I'm missing?
Any guidance or pointers would be greatly appreciated! I may well be missing something in the existing workflow.
Background
I'm using
StreamFeatureViewwithKafkaSource+ local provider. The real-time ingestion path works well —StreamProcessor.ingest_stream_feature_view(to=ONLINE_AND_OFFLINE)consumes from Kafka, applies the pandas UDF, and writes transformed features to both online and offline stores.Now I need to backfill historical data that predates the streaming pipeline. I have raw Kafka history available via ETL export, but I'm not sure what the recommended approach is to get UDF-transformed features into the offline store.
My setup
What I've tried
1. ETL export +
materialize()Export raw Kafka history to
batch_sourceparquet, then callstore.materialize(start, end).This reads the batch_source, applies the UDF, and writes to the online store — but the offline store is not updated. After
materialize(), callingget_historical_features()still returns raw data without the UDF-computed derived features.2. Re-ingest via
StreamProcessorfrom KafkaUse
StreamProcessor.ingest(to=ONLINE_AND_OFFLINE)with offset reset to consume historical messages.This only works within Kafka's retention window and doesn't scale for large history.
Questions
Is there a recommended way to backfill the offline store with UDF-transformed data? I want the UDF defined on the StreamFeatureView to be the single source of truth, avoiding duplication in ETL scripts.
Does
materialize()intentionally skip the offline store? If so, what is the expected workflow for keeping the offline store in sync with UDF-transformed features after a historical backfill?Batch_source and offline store are the same file — For local/file-based offline stores,
materialize()reads from thebatch_sourceparquet, which is the same file thatwrite_to_offline_store()writes to. Even ifmaterialize()were extended to write back to the offline store, this would create problems:Is separating the batch_source (raw input) from the offline store (transformed output) something that has been considered? Or is there an existing pattern I'm missing?
Any guidance or pointers would be greatly appreciated! I may well be missing something in the existing workflow.