golang-samber-ro

작성자: samber

Golang에서 samber/ro를 사용한 리액티브 스트림 및 이벤트 기반 프로그래밍 — 150개 이상의 타입 안전 연산자, 콜드/핫 옵저버블, 5가지 서브젝트 유형(Publish, Behavior, Replay, Async, Unicast), Pipe를 통한 선언적 파이프라인, 40개 이상의 플러그인(HTTP, cron, fsnotify, JSON, 로깅), 자동 역압력, 오류 전파, Go 컨텍스트 통합을 갖춘 ReactiveX 구현. samber/ro를 사용하거나 채택할 때, 코드베이스가 github.com/samber/ro를 임포트할 때, 또는 비동기 구축 시 적용...

npx skills add https://github.com/samber/cc-skills-golang --skill golang-samber-ro

Persona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.

Thinking mode: Use ultrathink when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.

samber/ro — Reactive Streams for Go

Go implementation of ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.

Official Resources:

This skill is not exhaustive. Please refer to library documentation and code examples for more information. Context7 can help as a discoverability platform.

Why samber/ro (Streams vs Slices)

Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators. samber/ro solves this with declarative, chainable stream operators.

When to use which tool:

ScenarioToolWhy
Transform a slice (map, filter, reduce)samber/loFinite, synchronous, eager — no stream overhead needed
Simple goroutine fan-out with error handlingerrgroupStandard lib, lightweight, sufficient for bounded concurrency
Infinite event stream (WebSocket, tickers, file watcher)samber/roDeclarative pipeline with backpressure, retry, timeout, combine
Real-time data enrichment from multiple async sourcessamber/roCombineLatest/Zip compose dependent streams without manual select
Pub/sub with multiple consumers sharing one sourcesamber/roHot observables (Share/Subjects) handle multicast natively

Key differences: lo vs ro

Aspectsamber/losamber/ro
DataFinite slicesInfinite streams
ExecutionSynchronous, blockingAsynchronous, non-blocking
EvaluationEager (allocates intermediate slices)Lazy (processes items as they arrive)
TimingImmediateTime-aware (delay, throttle, interval, timeout)
Error modelReturn (T, error) per callError channel propagates through pipeline
Use caseCollection transformsEvent-driven, real-time, async pipelines

Installation

go get github.com/samber/ro

Core Concepts

Four building blocks:

  1. Observable — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
  2. Observer — a consumer with three callbacks: onNext(T), onError(error), onComplete()
  3. Operator — a function that transforms an observable into another observable, chained via Pipe
  4. Subscription — the connection between observable and observer. Call .Wait() to block or .Unsubscribe() to cancel
observable := ro.Pipe2(
    ro.RangeWithInterval(0, 5, 1*time.Second),
    ro.Filter(func(x int) bool { return x%2 == 0 }),
    ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)

observable.Subscribe(ro.NewObserver(
    func(s string) { fmt.Println(s) },      // onNext
    func(err error) { log.Println(err) },    // onError
    func() { fmt.Println("Done!") },         // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"

// Or collect synchronously:
values, err := ro.Collect(observable)

Cold vs Hot Observables

Cold (default): each .Subscribe() starts a new independent execution. Safe and predictable — use by default.

Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.

Convert withBehavior
Share()Cold → hot with reference counting. Last unsubscribe tears down
ShareReplay(n)Same as Share + buffers last N values for late subscribers
Connectable()Cold → hot, but waits for explicit .Connect() call
SubjectsNatively hot — call .Send(), .Error(), .Complete() directly
SubjectConstructorReplay behavior
PublishSubjectNewPublishSubject[T]()None — late subscribers miss past events
BehaviorSubjectNewBehaviorSubject[T](initial)Replays last value to new subscribers
ReplaySubjectNewReplaySubject[T](bufferSize)Replays last N values
AsyncSubjectNewAsyncSubject[T]()Emits only last value, only on complete
UnicastSubjectNewUnicastSubject[T](bufferSize)Single subscriber only

For subject details and hot observable patterns, see Subjects Guide.

Operator Quick Reference

CategoryKey operatorsPurpose
CreationJust, FromSlice, FromChannel, Range, Interval, Defer, FutureCreate observables from various sources
TransformMap, MapErr, FlatMap, Scan, Reduce, GroupByTransform or accumulate stream values
FilterFilter, Take, TakeLast, Skip, Distinct, Find, First, LastSelectively emit values
CombineMerge, Concat, Zip2Zip6, CombineLatest2CombineLatest5, RaceMerge multiple observables
ErrorCatch, OnErrorReturn, OnErrorResumeNextWith, Retry, RetryWithConfigRecover from errors
TimingDelay, DelayEach, Timeout, ThrottleTime, SampleTime, BufferWithTimeControl emission timing
Side effectTap/Do, TapOnNext, TapOnError, TapOnCompleteObserve without altering stream
TerminalCollect, ToSlice, ToChannel, ToMapConsume stream into Go types

Use typed Pipe2, Pipe3 ... Pipe25 for compile-time type safety across operator chains. The untyped Pipe uses any and loses type checking.

For the complete operator catalog (150+ operators with signatures), see Operators Guide.

Common Mistakes

MistakeWhy it failsFix
Using ro.OnNext() without error handlerErrors are silently dropped — bugs hide in productionUse ro.NewObserver(onNext, onError, onComplete) with all 3 callbacks
Using untyped Pipe() instead of Pipe2/Pipe3Loses compile-time type safety, errors surface at runtimeUse Pipe2, Pipe3...Pipe25 for typed operator chains
Forgetting .Unsubscribe() on infinite streamsGoroutine leak — the observable runs foreverUse TakeUntil(signal), context cancellation, or explicit Unsubscribe()
Using Share() when cold is sufficientUnnecessary complexity, harder to reason about lifecycleUse hot observables only when multiple consumers need the same stream
Using samber/ro for finite slice transformsStream overhead (goroutines, subscriptions) for a synchronous operationUse samber/lo — it's simpler, faster, and purpose-built for slices
Not propagating context for cancellationStreams ignore shutdown signals, causing resource leaks on terminationChain ContextWithTimeout or ThrowOnContextCancel in the pipeline

Best Practices

  1. Always handle all three events — use NewObserver(onNext, onError, onComplete), not just OnNext. Unhandled errors cause silent data loss
  2. Use Collect() for synchronous consumption — when the stream is finite and you need []T, Collect blocks until complete and returns the slice + error
  3. Prefer typed Pipe functionsPipe2, Pipe3...Pipe25 catch type mismatches at compile time. Reserve untyped Pipe for dynamic operator chains
  4. Bound infinite streams — use Take(n), TakeUntil(signal), Timeout(d), or context cancellation. Unbounded streams leak goroutines
  5. Use Tap/Do for observability — log, trace, or meter emissions without altering the stream. Chain TapOnError for error monitoring
  6. Prefer samber/lo for simple transforms — if the data is a finite slice and you need Map/Filter/Reduce, use lo. Reach for ro when data arrives over time, from multiple sources, or needs retry/timeout/backpressure

Plugin Ecosystem

40+ plugins extend ro with domain-specific operators:

CategoryPluginsImport path prefix
EncodingJSON, CSV, Base64, Gobplugins/encoding/...
NetworkHTTP, I/O, FSNotifyplugins/http, plugins/io, plugins/fsnotify
SchedulingCron, ICSplugins/cron, plugins/ics
ObservabilityZap, Slog, Zerolog, Logrus, Sentry, Oopsplugins/observability/..., plugins/samber/oops
Rate limitingNative, Ululeplugins/ratelimit/...
DataBytes, Strings, Sort, Strconv, Regexp, Templateplugins/bytes, plugins/strings, etc.
SystemProcess, Signalplugins/proc, plugins/signal

For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.

For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.

If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.

Cross-References

  • → See samber/cc-skills-golang@golang-samber-lo skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice
  • → See samber/cc-skills-golang@golang-samber-mo skill for monadic types (Option, Result, Either) that compose with ro pipelines
  • → See samber/cc-skills-golang@golang-samber-hot skill for in-memory caching (also available as an ro plugin)
  • → See samber/cc-skills-golang@golang-concurrency skill for goroutine/channel patterns when reactive streams are overkill
  • → See samber/cc-skills-golang@golang-observability skill for monitoring reactive pipelines in production

samber의 다른 스킬

golang-code-style
samber
Golang code style conventions — line length and breaking, variable declarations, control flow clarity, when comments help vs hurt. Use when writing or reviewing Go code, asking about style or clarity, or establishing project coding standards. Not for naming conventions (→ See `samber/cc-skills-golang@golang-naming` skill), linter configuration (→ See `samber/cc-skills-golang@golang-lint` skill), or doc comments (→ See `samber/cc-skills-golang@golang-documentation` skill).
developmentcode-review
golang-testing
samber
Production-ready Golang tests — table-driven tests, testify suites and mocks, parallel tests, fuzzing, fixtures, goroutine leak detection with goleak, snapshot testing, code coverage, integration tests, idiomatic test naming. Use when writing or reviewing Go tests, choosing a testing approach, setting up Go test CI, or debugging flaky/slow tests. For testify-specific APIs see `samber/cc-skills-golang@golang-stretchr-testify`; for measurement methodology see...
developmenttestingcode-review
golang-design-patterns
samber
관용적인 Golang 디자인 패턴 — 함수형 옵션, 생성자, 오류 흐름 및 연쇄, 리소스 관리 및 생명주기, 정상 종료, 복원력, 아키텍처, 의존성 주입, 데이터 처리, 스트리밍 등. 아키텍처 패턴을 명시적으로 선택할 때, 함수형 옵션을 구현할 때, 생성자 API를 설계할 때, 정상 종료를 설정할 때, 복원력 패턴을 적용할 때, 또는 특정 문제에 맞는 관용적인 Go 패턴을 질문할 때 적용하세요.
developmentdesigncode-review
golang-error-handling
samber
Idiomatic Golang error handling — creation, wrapping with %w, errors.Is/As, errors.Join, custom error types, sentinel errors, panic/recover, the single handling rule, structured logging with slog, HTTP request logging middleware, and samber/oops for production errors. Built to make logs usable at scale with log aggregation 3rd-party tools. Apply when creating, wrapping, inspecting, or logging errors in Go code. For samber/oops specifics → See `samber/cc-skills-golang@golang-samber-oops`...
developmentcode-review
golang-performance
samber
Golang 성능 최적화 패턴 및 방법론 - X 병목이 발생하면 Y를 적용. 할당 감소, CPU 효율성, 메모리 레이아웃, GC 튜닝, 풀링, 캐싱, 핫패스 최적화를 다룹니다. 프로파일링이나 벤치마크에서 병목이 확인되어 이를 해결할 적절한 최적화 패턴이 필요할 때 사용합니다. 또한 성능 코드 리뷰 시 개선 사항이나 빠른 성능 향상을 식별하는 데 도움이 될 벤치마크를 제안할 때 사용합니다. 측정 방법론에는 해당하지 않습니다(→...
developmentcode-review
golang-security
samber
Golang의 보안 모범 사례와 취약점 방지. 인젝션(SQL, 명령어, XSS), 암호화, 파일 시스템 안전, 네트워크 보안, 쿠키, 비밀 관리, 메모리 안전, 로깅을 다룹니다. 보안을 위해 Go 코드를 작성, 검토 또는 감사할 때, 또는 암호화, I/O, 비밀 관리, 사용자 입력 처리, 인증과 관련된 위험한 코드 작업 시 적용하세요. 보안 도구 구성도 포함됩니다.
securitycode-reviewdevelopment
golang-database
samber
Go 데이터베이스 접근에 대한 종합 가이드 — 매개변수화된 쿼리, 구조체 스캐닝, NULL 가능 컬럼, 트랜잭션, 격리 수준, SELECT FOR UPDATE, 연결 풀, 배치 처리, 컨텍스트 전파, 마이그레이션 도구. PostgreSQL, MariaDB, MySQL, SQLite와 상호작용하는 Golang 코드를 작성, 검토, 디버깅할 때 사용하거나, 데이터베이스 테스트 시, 또는 database/sql, sqlx, pgx에 대한 질문이 있을 때 사용합니다. 데이터베이스 스키마나 마이그레이션 SQL은 생성하지 않습니다.
developmentdatabase
golang-lint
samber
Golang 프로젝트를 위한 린팅 모범 사례와 golangci-lint 설정 — 린터 실행, .golangci.yml 구성, nolint 지시어로 경고 억제, 린트 출력 해석, 린터 선택. golangci-lint를 구성할 때, 린트 경고나 nolint 억제에 대해 질문할 때, 코드 품질 도구를 설정할 때, 또는 린터를 선택할 때 사용합니다. 또한 사용자가 golangci-lint, go vet, staticcheck, revive를 언급할 때 사용합니다.
developmentcode-reviewtesting