Browse Reactive Programming

Subject Types: Reactive Programming with Clojure

Explore different Subject types in Reactive Extensions to handle hot and cold observables with examples in Clojure.

Introduction

In reactive programming, Subjects are a powerful concept enabling both emitting and subscribing to data streams. They act as a bridge between imperative and reactive worlds by allowing us to manually trigger and observe reactive sequences. This article explores different types of Subjects: BehaviorSubject, ReplaySubject, and others with examples in Clojure, providing a detailed understanding and illustration of their applications and differences.

Subject Types Overview

The different Subject types in Reactive Extensions are instrumental in defining how the emitted values are subscribed to by observers. Here are the key Subject types:

  1. BehaviorSubject: Stores the latest value and emits it immediately to any new subscribers upon subscription. Ideal for maintaining current state information accessible at any time.

  2. ReplaySubject: Caches all the emitted values and replays them to any new subscribers from the start. Useful when you need to process past values by late subscribers.

  3. AsyncSubject: Emits the last value to observers as soon as the source completes, regardless of when they subscribed.

  4. PublishSubject: Does not replay any emitted data to new subscribers, only the new data emitted after their subscription.

Each of these subjects can play an essential role depending on the requirement of the reactive data flows they are used within.

Clojure Implementation Examples

BehaviorSubject

A BehaviorSubject in Clojure holds the most recent value emitted and instantly provides it to any new observer that subscribes after the value has been emitted.

 1(require '[clojure.core.async :as async])
 2
 3(defn behavior-subject [initial-value]
 4  (let [state (atom initial-value)
 5        observers (atom #{})]
 6    {:emit       (fn [new-value]
 7                   (reset! state new-value)
 8                   (doseq [obs @observers]
 9                     (obs new-value)))
10     :subscribe  (fn [observer]
11                   (swap! observers conj observer)
12                   (observer @state))}))
13
14(let [subject (behavior-subject 0)]
15  ((:subscribe subject) #(println "Observer 1:" %))
16  ((:emit subject) 1)
17  ((:subscribe subject) #(println "Observer 2:" %))
18  ((:emit subject) 2))

ReplaySubject

ReplaySubject becomes useful when subscribers need to immediately receive all previously emitted values upon subscription.

 1(defn replay-subject []
 2  (let [history (atom [])
 3        observers (atom #{})]
 4    {:emit       (fn [new-value]
 5                   (swap! history conj new-value)
 6                   (doseq [obs @observers]
 7                     (obs new-value)))
 8     :subscribe  (fn [observer]
 9                   (doseq [value @history]
10                     (observer value))
11                   (swap! observers conj observer))}))
12
13(let [subject (replay-subject)]
14  ((:emit subject) 1)
15  ((:emit subject) 2)
16  ((:subscribe subject) #(println "Observer:" %))
17  ((:emit subject) 3))

UML Sequence Diagram

Below is a UML sequence diagram illustrating how a Subject type works:

    sequenceDiagram
	    participant S as Subject
	    participant O1 as Observer 1
	    participant O2 as Observer 2
	
	    S->>O1: Subscribe
	    S->>O1: Emit 1
	    O1-->>S: Receive
	    S->>O2: Subscribe
	    S->>O2: Emit 1
	    O2-->>S: Receive
	    S->>S: Emit 2
	    S->>O1: Emit 2
	    S->>O2: Emit 2
	    O1-->>S: Receive
	    O2-->>S: Receive
  • Observer Pattern: A foundational pattern in reactive systems, enabling objects to notify subscribed observers of state changes.

  • Command Pattern: In reactive systems, it can encapsulate commands as observable sequences.

Additional Resources

  • ReactiveX Documentation: ReactiveX.io
  • Clojure’s core.async library: Deepen understanding of asynchronous programming in Clojure.

Conclusion

In reactive programming, Subjects enhance the flexibility of asynchronous data streams, enabling us to construct complex event-driven systems effectively. Understanding the behavioral differences among Subject types like BehaviorSubject and ReplaySubject equips us to choose the right tool for the task, fostering robust and responsive applications within the functional programming paradigm.