ray-data
작성자: firecrawl
ML 워크로드를 위한 확장 가능한 데이터 처리. CPU/GPU에서 스트리밍 실행, Parquet/CSV/JSON/이미지 지원. Ray Train, PyTorch 등과 통합.
npx skills add https://github.com/firecrawl/ai-research-skills --skill ray-dataRay Data - Scalable ML Data Processing
Distributed data processing library for ML and AI workloads.
When to use Ray Data
Use Ray Data when:
- Processing large datasets (>100GB) for ML training
- Need distributed data preprocessing across cluster
- Building batch inference pipelines
- Loading multi-modal data (images, audio, video)
- Scaling data processing from laptop to cluster
Key features:
- Streaming execution: Process data larger than memory
- GPU support: Accelerate transforms with GPUs
- Framework integration: PyTorch, TensorFlow, HuggingFace
- Multi-modal: Images, Parquet, CSV, JSON, audio, video
Use alternatives instead:
- Pandas: Small data (<1GB) on single machine
- Dask: Tabular data, SQL-like operations
- Spark: Enterprise ETL, SQL queries
Quick start
Installation
pip install -U 'ray[data]'
Load and transform data
import ray
# Read Parquet files
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# Transform data (lazy execution)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
# Consume data
for batch in ds.iter_batches(batch_size=100):
print(batch)
Integration with Ray Train
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# Create dataset
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config):
# Access dataset in training
train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
for batch in train_ds.iter_batches(batch_size=32):
# Train on batch
pass
# Train with Ray
trainer = TorchTrainer(
train_func,
datasets={"train": train_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
Reading data
From cloud storage
import ray
# Parquet (recommended for ML)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")
# JSON
ds = ray.data.read_json("gs://bucket/data/*.json")
# Images
ds = ray.data.read_images("s3://bucket/images/")
From Python objects
# From list
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
# From range
ds = ray.data.range(1000000) # Synthetic data
# From pandas
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
Transformations
Map batches (vectorized)
# Batch transformation (fast)
def process_batch(batch):
batch["doubled"] = batch["value"] * 2
return batch
ds = ds.map_batches(process_batch, batch_size=1000)
Row transformations
# Row-by-row (slower)
def process_row(row):
row["squared"] = row["value"] ** 2
return row
ds = ds.map(process_row)
Filter
# Filter rows
ds = ds.filter(lambda row: row["value"] > 100)
Group by and aggregate
# Group by column
ds = ds.groupby("category").count()
# Custom aggregation
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
GPU-accelerated transforms
# Use GPU for preprocessing
def preprocess_images_gpu(batch):
import torch
images = torch.tensor(batch["image"]).cuda()
# GPU preprocessing
processed = images * 255
return {"processed": processed.cpu().numpy()}
ds = ds.map_batches(
preprocess_images_gpu,
batch_size=64,
num_gpus=1 # Request GPU
)
Writing data
# Write to Parquet
ds.write_parquet("s3://bucket/output/")
# Write to CSV
ds.write_csv("output/")
# Write to JSON
ds.write_json("output/")
Performance optimization
Repartition
# Control parallelism
ds = ds.repartition(100) # 100 blocks for 100-core cluster
Batch size tuning
# Larger batches = faster vectorized ops
ds.map_batches(process_fn, batch_size=10000) # vs batch_size=100
Streaming execution
# Process data larger than memory
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
process(batch) # Streamed, not loaded to memory
Common patterns
Batch inference
import ray
# Load model
def load_model():
# Load once per worker
return MyModel()
# Inference function
class BatchInference:
def __init__(self):
self.model = load_model()
def __call__(self, batch):
predictions = self.model(batch["input"])
return {"prediction": predictions}
# Run distributed inference
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")
Data preprocessing pipeline
# Multi-step pipeline
ds = (
ray.data.read_parquet("s3://raw/")
.map_batches(clean_data)
.map_batches(tokenize)
.map_batches(augment)
.write_parquet("s3://processed/")
)
Integration with ML frameworks
PyTorch
# Convert to PyTorch
torch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds:
# batch is dict with tensors
inputs, labels = batch["features"], batch["label"]
TensorFlow
# Convert to TensorFlow
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds:
# Train model
pass
Supported data formats
| Format | Read | Write | Use Case |
|---|---|---|---|
| Parquet | ✅ | ✅ | ML data (recommended) |
| CSV | ✅ | ✅ | Tabular data |
| JSON | ✅ | ✅ | Semi-structured |
| Images | ✅ | ❌ | Computer vision |
| NumPy | ✅ | ✅ | Arrays |
| Pandas | ✅ | ❌ | DataFrames |
Performance benchmarks
Scaling (processing 100GB data):
- 1 node (16 cores): ~30 minutes
- 4 nodes (64 cores): ~8 minutes
- 16 nodes (256 cores): ~2 minutes
GPU acceleration (image preprocessing):
- CPU only: 1,000 images/sec
- 1 GPU: 5,000 images/sec
- 4 GPUs: 18,000 images/sec
Use cases
Production deployments:
- Pinterest: Last-mile data processing for model training
- ByteDance: Scaling offline inference with multi-modal LLMs
- Spotify: ML platform for batch inference
References
- Transformations Guide - Map, filter, groupby operations
- Integration Guide - Ray Train, PyTorch, TensorFlow
Resources
- Docs: https://docs.ray.io/en/latest/data/data.html
- GitHub: https://github.com/ray-project/ray ⭐ 36,000+
- Version: Ray 2.40.0+
- Examples: https://docs.ray.io/en/latest/data/examples/overview.html
firecrawl의 다른 스킬
oracle
firecrawl
oracle CLI 사용 모범 사례 (프롬프트 + 파일 번들링, 엔진, 세션 및 파일 첨부 패턴)
official
firecrawl-monitor
firecrawl
웹사이트 콘텐츠 변경을 감지하고 웹훅이나 이메일로 알림을 받습니다 — 크론 작업, 스크래퍼, diff 스크립트가 필요하지 않습니다. 사용자가 페이지 변경 사항을 추적하거나, 경쟁사 가격을 모니터링하거나, 새 채용 공고나 블로그 게시물에 대한 알림을 받거나, 문서/변경 로그/상태 페이지를 모니터링하거나, "모니터링", "감시", "추적", "변경 시 알림", "X가 변경되면 알림", "변경되면 알려줘", "변경 시 이메일 보내줘", "웹훅 보내줘"라고 말할 때 이 스킬을 사용하세요. 내장된 AI 판별기가 포맷, 타임스탬프 등을 필터링합니다...
officialweb-scrapingresearch
firecrawl-deep-research
firecrawl
Firecrawl을 사용하여 다중 소스 심층 연구를 실행합니다. 사용자가 주제를 조사하거나, 관점을 비교하거나, 출처가 포함된 브리핑을 작성하거나, 기술적 또는 시장 관련 질문을 조사하거나, 여러 소스의 웹 증거를 종합하도록 요청할 때 사용하세요.
officialresearchweb-scraping
firecrawl-research-papers
firecrawl
Firecrawl을 사용하여 연구 논문, 백서, PDF, 기술 보고서 및 학술 자료를 찾고 종합합니다. 사용자가 문헌 검토, 논문 요약, 연구 동향, 또는 PDF 및 학술/산업 간행물에서 출처가 포함된 종합 정보를 원할 때 사용하세요.
officialresearchweb-scraping
firecrawl-market-research
firecrawl
Firecrawl을 사용하여 시장, 재무, 실적, 산업 및 기업 지표를 추출합니다. 사용자가 시장 조사, 산업 동향, 상장 기업 데이터, 재무 비교, 실적 조사 또는 구조화된 시장 보고서를 요청할 때 사용하세요.
officialresearchweb-scraping
firecrawl-website-design-clone
firecrawl
Firecrawl 스크레이프 증거를 사용하여 모든 웹사이트의 디자인 시스템을 에이전트가 사용할 수 있는 DESIGN.md로 추출합니다. 사용자가 웹사이트의 색상, 글꼴, 간격, 구성 요소, 레이아웃 패턴 또는 브랜드/UI 가이드를 원할 때 사용하여 AI 에이전트가 새 웹사이트를 만들거나, 디자인을 복제하거나, 해당 디자인에서 영감을 받은 페이지를 구축할 수 있도록 합니다.
officialdesignweb-scraping
firecrawl-knowledge-base
firecrawl
Firecrawl을 사용하여 웹 콘텐츠로 지식 베이스를 구축하세요. 로컬 참조 문서, RAG 준비 청크, 파인튜닝 데이터셋, 문서 미러, 주제 코퍼스 또는 웹 소스에서 정리된 LLM 준비 마크다운에 사용할 수 있습니다.
officialweb-scrapingresearch
firecrawl-lead-research
firecrawl
Firecrawl을 사용하여 회의 전 리드 인텔리전스 브리핑을 생성합니다. 사용자가 영업 통화, 파트너십 회의, 투자자 대화 또는 고객 인터뷰 전에 회사 조사, 인물 조사, 최신 뉴스, 대화 포인트, 문제점 또는 아웃리치 준비가 필요할 때 사용합니다.
officialresearchweb-scraping