Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 113 additions & 66 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
# FlowState

> **Model workflows without magic.**
> **Model workflows cleanly, explicitly, and with durable persistence between steps.**

---

**FlowState** provides a clean, Rails-native way to model **stepped workflows** as explicit, durable state machines.
It lets you define each step, move between states deliberately, and track execution — without relying on metaprogramming, `method_missing`, or hidden magic.
**FlowState** provides a clean, Rails-native way to model **stepped workflows** as explicit, durable workflows, with support for persisting arbitrary artefacts between transitions. It lets you define each step, move between states safely, track execution history, and persist payloads ("artefacts") in a type-safe way — without using metaprogramming, `method_missing`, or other hidden magic.

Every workflow instance is persisted to the database.
Every transition is logged.
Every change happens through clear, intention-revealing methods you define yourself.
Perfect for workflows that rely on third party resources and integrations.
Every workflow instance, transition and artefact is persisted to the database.
Every change happens through clear, intention-revealing methods that you define yourself.

Built for real-world systems where you need to:
- Track complex, multi-step processes
- Handle failures gracefully
- Persist state safely across asynchronous jobs
- Handle failures gracefully with error states and retries
- Persist state and interim data across asynchronous jobs
- Store and type-check arbitrary payloads (artefacts) between steps
- Avoid race conditions via database locks and explicit guards

---

## Key Features

- **Explicit transitions** — Every state change is triggered manually via a method you define.
- **Full execution history** — Every transition is recorded with timestamps and a history table.
- **Error recovery** — Model and track failures directly with error states.
- **Typed payloads** — Strongly-typed metadata attached to every workflow.
- **Persistence-first** — Workflow state is stored in your database, not memory.
- **No Magic** — No metaprogramming, no dynamic method generation, no `method_missing` tricks.
- **Explicit transitions** — Every state change is triggered manually via a method you define.
- **Full execution history** — Every transition is recorded with timestamps and a history table.
- **Error recovery** — Model and track failures directly with error states.
- **Typed payloads** — Strongly-typed metadata attached to every workflow.
- **Artefact persistence** — Declare named and typed artefacts to persist between specific transitions.
- **Guard clauses** — Protect transitions with guards that raise if conditions aren’t met.
- **Persistence-first** — Workflow state and payloads are stored in your database, not memory.
- **No Magic** — No metaprogramming, no dynamic method generation, no `method_missing` tricks.

---

Expand All @@ -46,88 +49,105 @@ bin/rails db:migrate

---

## Example: Syncing song data with Soundcharts
## Example: Saving a third party API response to local database

Suppose you want to build a workflow that:
- Gets song metadata from Soundcharts
- Then fetches audience data
- Tracks each step and handles retries on failure
- Fetches a response from a third party API
- Allows for retrying the fetch on failure
- And persists the response to the workflow
- Then saves the persisted response to the database
- As two separate, encapsulated jobs
- Tracking each step, while protecting against race conditions

---

### Define your Flow

```ruby
class SyncSoundchartsFlow < FlowState::Base
prop :song_id, String
class SyncThirdPartyApiFlow < FlowState::Base
prop :my_record_id, String
prop :third_party_id, String

state :pending
state :picked
state :syncing_song_metadata
state :synced_song_metadata
state :syncing_audience_data
state :synced_audience_data
state :fetching_third_party_api
state :fetched_third_party_api
state :failed_to_fetch_third_party_api, error: true
state :saving_my_record
state :saved_my_record
state :failed_to_save_my_record, error: true
state :completed

error_state :failed_to_sync_song_metadata
error_state :failed_to_sync_audience_data
persist :third_party_api_response

initial_state :pending

def pick!
transition!(
from: %i[pending completed failed_to_sync_song_metadata failed_to_sync_audience_data],
from: %i[pending],
to: :picked,
after_transition: -> { sync_song_metadata }
after_transition: -> { enqueue_fetch }
)
end

def start_song_metadata_sync!
transition!(from: :picked, to: :syncing_song_metadata)
end

def finish_song_metadata_sync!
def start_third_party_api_request!
transition!(
from: :syncing_song_metadata, to: :synced_song_metadata,
after_transition: -> { sync_audience_data }
from: %i[picked failed_to_fetch_third_party_api],
to: :fetching_third_party_api
)
end

def fail_song_metadata_sync!
transition!(from: :syncing_song_metadata, to: :failed_to_sync_song_metadata)
def finish_third_party_api_request!(result)
transition!(
from: :fetching_third_party_api,
to: :fetched_third_party_api,
persists: :third_party_api_response,
after_transition: -> { enqueue_save }
) { result }
end

def start_audience_data_sync!
transition!(from: :synced_song_metadata, to: :syncing_audience_data)
def fail_third_party_api_request!
transition!(
from: :fetching_third_party_api,
to: :failed_to_fetch_third_party_api
)
end

def start_record_save!
transition!(
from: %i[fetched_third_party_api failed_to_save_my_record],
to: :saving_my_record,
guard: -> { flow_artefacts.where(name: 'third_party_api_response').exists? }
)
end

def finish_audience_data_sync!
def finish_record_save!
transition!(
from: :syncing_audience_data, to: :synced_audience_data,
from: :saving_my_record,
to: :saved_my_record,
after_transition: -> { complete! }
)
end

def fail_audience_data_sync!
transition!(from: :syncing_audience_data, to: :failed_to_sync_audience_data)
def fail_record_save!
transition!(
from: :saving_my_record,
to: :failed_to_save_my_record
)
end

def complete!
transition!(from: :synced_audience_data, to: :completed, after_transition: -> { destroy })
transition!(from: :saved_my_record, to: :completed, after_transition: -> { destroy })
end

private

def song
@song ||= Song.find(song_id)
end

def sync_song_metadata
SyncSoundchartsSongJob.perform_later(flow_id: id)
def enqueue_fetch
FetchThirdPartyJob.perform_later(flow_id: id)
end

def sync_audience_data
SyncSoundchartsAudienceJob.perform_later(flow_id: id)
def enqueue_save
SaveLocalRecordJob.perform_later(flow_id: id)
end
end
```
Expand All @@ -140,54 +160,81 @@ Each job moves the flow through the correct states, step-by-step.

---

**Sync song metadata**
**Create and start the flow**

```ruby
class SyncSoundchartsSongJob < ApplicationJob
flow = SyncThirdPartyApiFlow.create(
my_record_id: "my_local_record_id",
third_party_id: "some_service_id"
)

flow.pick!
```

---

**Fetch Third Party API Response**

```ruby
class FetchThirdPartyJob < ApplicationJob
retry_on StandardError,
wait: ->(executions) { 10.seconds * (2**executions) },
attempts: 3

def perform(flow_id:)
@flow_id = flow_id

flow.start_song_metadata_sync!
flow.start_third_party_api_request!

# Fetch song metadata from Soundcharts etc
response = ThirdPartyApiRequest.new(id: flow.third_party_id).to_h

flow.finish_song_metadata_sync!
flow.finish_third_party_api_request!(response)
rescue
flow.fail_song_metadata_sync!
flow.fail_third_party_api_request!
raise
end

private

def flow
@flow ||= SyncSoundchartsFlow.find(@flow_id)
@flow ||= SyncThirdPartyApiFlow.find(@flow_id)
end
end
```

---

**Sync audience data**
**Save Result to Local Database**

```ruby
class SyncSoundchartsAudienceJob < ApplicationJob
class SaveLocalRecordJob < ApplicationJob
def perform(flow_id:)
@flow_id = flow_id

flow.start_audience_data_sync!
flow.start_record_save!

# Fetch audience data from Soundcharts etc
record.update!(payload: third_party_payload)

flow.finish_audience_data_sync!
flow.finish_record_save!
rescue
flow.fail_audience_data_sync!
flow.fail_record_save!
raise
end

private

def flow
@flow ||= SyncSoundchartsFlow.find(@flow_id)
@flow ||= SyncThirdPartyApiFlow.find(@flow_id)
end

def third_party_payload
flow.flow_artefacts
.find_by!(name: 'third_party_api_response')
.payload
end

def record
@record ||= MyRecord.find(flow.my_record_id)
end
end
```
Expand Down
Loading