From efe03cdb285a5bafc50ae41f8d59fab33aad300b Mon Sep 17 00:00:00 2001 From: Chris Garrett Date: Tue, 22 Apr 2025 10:42:41 +0100 Subject: [PATCH 1/6] Deprecate error_state in favour of error: flag --- lib/flow_state/base.rb | 24 +++++++++++++++++------- lib/flow_state/version.rb | 2 +- spec/flow_state_spec.rb | 2 +- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/lib/flow_state/base.rb b/lib/flow_state/base.rb index 9b1da78..f4dac54 100644 --- a/lib/flow_state/base.rb +++ b/lib/flow_state/base.rb @@ -7,8 +7,9 @@ class UnknownStateError < StandardError; end class InvalidTransitionError < StandardError; end class PayloadValidationError < StandardError; end + DEPRECATOR = ActiveSupport::Deprecation.new(FlowState::VERSION, 'FlowState') + self.table_name = 'flow_state_flows' - # self.abstract_class = true - this stops Rails respecting STI has_many :flow_transitions, class_name: 'FlowState::FlowTransition', @@ -17,13 +18,19 @@ class PayloadValidationError < StandardError; end dependent: :destroy class << self - def state(name) - all_states << name.to_sym + def state(name, error: false) + name = name.to_sym + all_states << name + error_states << name if error end def error_state(name) - error_states << name.to_sym - state(name) + DEPRECATOR.warn( + 'FlowState::Base.error_state is deprecated. ' \ + 'Use state(name, error: true) instead.' + ) + + state(name, error: true) end def initial_state(name = nil) @@ -61,7 +68,8 @@ def transition!(from:, to:, after_transition: nil) # rubocop:disable Metrics/Abc with_lock do unless from.include?(current_state&.to_sym) - raise InvalidTransitionError, "state #{current_state} not in #{from} (#{from.inspect}->#{to.inspect}" + raise InvalidTransitionError, + "state #{current_state} not in #{from} (#{from.inspect}->#{to.inspect}" end transaction do @@ -76,7 +84,9 @@ def transition!(from:, to:, after_transition: nil) # rubocop:disable Metrics/Abc after_transition&.call end - def errored? = self.class.error_states.include?(current_state&.to_sym) + def errored? + self.class.error_states.include?(current_state&.to_sym) + end private diff --git a/lib/flow_state/version.rb b/lib/flow_state/version.rb index 2212bca..a2e5fca 100644 --- a/lib/flow_state/version.rb +++ b/lib/flow_state/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module FlowState - VERSION = '0.1.4' + VERSION = '0.1.5' end diff --git a/spec/flow_state_spec.rb b/spec/flow_state_spec.rb index 4d9d636..ab01953 100644 --- a/spec/flow_state_spec.rb +++ b/spec/flow_state_spec.rb @@ -7,7 +7,7 @@ state :draft state :review - error_state :failed + state :failed, error: true initial_state :draft prop :name, String From b1e08ac5fb49beaf0ff0fd9e318bbb6f8474749f Mon Sep 17 00:00:00 2001 From: Chris Garrett Date: Tue, 22 Apr 2025 10:56:20 +0100 Subject: [PATCH 2/6] Add initial model for transition artefacts --- README.md | 11 +++++---- lib/flow_state/base.rb | 2 +- lib/flow_state/flow_transition.rb | 6 +++++ lib/flow_state/transition_artefact.rb | 13 +++++++++++ .../flow_state/install_generator.rb | 2 ++ .../create_flow_state_transition_artefacts.rb | 13 +++++++++++ .../flow_state/install_generator_spec.rb | 23 +++++++++++-------- 7 files changed, 55 insertions(+), 15 deletions(-) create mode 100644 lib/flow_state/transition_artefact.rb create mode 100644 lib/generators/flow_state/templates/create_flow_state_transition_artefacts.rb diff --git a/README.md b/README.md index 58b2326..0b29fb8 100644 --- a/README.md +++ b/README.md @@ -69,8 +69,8 @@ class SyncSoundchartsFlow < FlowState::Base state :synced_audience_data state :completed - error_state :failed_to_sync_song_metadata - error_state :failed_to_sync_audience_data + state :failed_to_sync_song_metadata, error: true + state :failed_to_sync_audience_data, error: true initial_state :pending @@ -83,7 +83,7 @@ class SyncSoundchartsFlow < FlowState::Base end def start_song_metadata_sync! - transition!(from: :picked, to: :syncing_song_metadata) + transition!(from: %i[picked failed_to_sync_song_metadata], to: :syncing_song_metadata) end def finish_song_metadata_sync! @@ -98,7 +98,10 @@ class SyncSoundchartsFlow < FlowState::Base end def start_audience_data_sync! - transition!(from: :synced_song_metadata, to: :syncing_audience_data) + transition!( + from: %i[synced_song_metadata failed_to_sync_audience_data], + to: :syncing_audience_data + ) end def finish_audience_data_sync! diff --git a/lib/flow_state/base.rb b/lib/flow_state/base.rb index f4dac54..b03a1e8 100644 --- a/lib/flow_state/base.rb +++ b/lib/flow_state/base.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module FlowState - # Base Model to be extended by app models + # Base Model to be extended by app flows class Base < ActiveRecord::Base class UnknownStateError < StandardError; end class InvalidTransitionError < StandardError; end diff --git a/lib/flow_state/flow_transition.rb b/lib/flow_state/flow_transition.rb index 9cb1b77..e5fa586 100644 --- a/lib/flow_state/flow_transition.rb +++ b/lib/flow_state/flow_transition.rb @@ -9,5 +9,11 @@ class FlowTransition < ActiveRecord::Base class_name: 'FlowState::Base', foreign_key: :flow_id, inverse_of: :flow_transitions + + has_many :artefacts, + class_name: 'FlowState::TransitionArtefact', + foreign_key: :transition_id, + inverse_of: :transition, + dependent: :destroy end end diff --git a/lib/flow_state/transition_artefact.rb b/lib/flow_state/transition_artefact.rb new file mode 100644 index 0000000..aae5cbc --- /dev/null +++ b/lib/flow_state/transition_artefact.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module FlowState + # Model for logging transition changes to + class TransitionArtefact < ActiveRecord::Base + self.table_name = 'flow_state_flow_artefacts' + + belongs_to :transition, + class_name: 'FlowState::FlowTransition', + foreign_key: :transition_id, + inverse_of: :transition_artefacts + end +end diff --git a/lib/generators/flow_state/install_generator.rb b/lib/generators/flow_state/install_generator.rb index b3c1dd5..97131c4 100644 --- a/lib/generators/flow_state/install_generator.rb +++ b/lib/generators/flow_state/install_generator.rb @@ -16,6 +16,8 @@ class InstallGenerator < Rails::Generators::Base def create_migrations migration_template 'create_flow_state_flows.rb', 'db/migrate/create_flow_state_flows.rb' migration_template 'create_flow_state_flow_transitions.rb', 'db/migrate/create_flow_state_flow_transitions.rb' + migration_template 'create_flow_state_transition_artefacts.rb', + 'db/migrate/create_flow_state_transition_artefacts.rb' end def self.next_migration_number(_dirname) diff --git a/lib/generators/flow_state/templates/create_flow_state_transition_artefacts.rb b/lib/generators/flow_state/templates/create_flow_state_transition_artefacts.rb new file mode 100644 index 0000000..bc4d42b --- /dev/null +++ b/lib/generators/flow_state/templates/create_flow_state_transition_artefacts.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +# Tbale for flow transition changes +class CreateFlowStateTransitionArtefacts < ActiveRecord::Migration[8.0] + def change + create_table :flow_state_transition_artefacts do |t| + t.references :transition, null: false, foreign_key: { to_table: :flow_state_flow_transitions } + t.string :name, null: false + t.json :payload + t.timestamps + end + end +end diff --git a/spec/generators/flow_state/install_generator_spec.rb b/spec/generators/flow_state/install_generator_spec.rb index 6f0aff3..47ff928 100644 --- a/spec/generators/flow_state/install_generator_spec.rb +++ b/spec/generators/flow_state/install_generator_spec.rb @@ -25,28 +25,31 @@ end let(:migration_files) { Dir.glob(File.join(migration_dir, '*.rb')) } - let(:filenames) { migration_files.map { |f| File.basename(f) } } - let(:timestamps) { filenames.map { |filename| filename[/^\d{14}/] } } + let(:filenames) { migration_files.map { |f| File.basename(f) } } + let(:timestamps) { filenames.map { |filename| filename[/^\d{14}/] } } - it 'creates two migration files' do - expect(migration_files.size).to eq(2) + it 'creates three migration files' do + expect(migration_files.size).to eq(3) end it 'names the migrations correctly' do expect(filenames.any? { |name| name.match(/^\d{14}_create_flow_state_flows\.rb$/) }).to be true expect(filenames.any? { |name| name.match(/^\d{14}_create_flow_state_flow_transitions\.rb$/) }).to be true + expect(filenames.any? { |name| name.match(/^\d{14}_create_flow_state_transition_artefacts\.rb$/) }).to be true end it 'assigns unique timestamps to each migration' do - expect(timestamps.size).to eq(2) - expect(timestamps.uniq.size).to eq(2) + expect(timestamps.size).to eq(3) + expect(timestamps.uniq.size).to eq(3) end it 'generates correct class definitions inside the migrations' do - flow_state_flows = migration_files.find { |f| f.include?('create_flow_state_flows') } - flow_state_flow_transitions = migration_files.find { |f| f.include?('create_flow_state_flow_transitions') } + flows_migration = migration_files.find { |f| f.include?('create_flow_state_flows') } + transitions_migration = migration_files.find { |f| f.include?('create_flow_state_flow_transitions') } + artefacts_migration = migration_files.find { |f| f.include?('create_flow_state_transition_artefacts') } - expect(File.read(flow_state_flows)).to include('class CreateFlowStateFlows') - expect(File.read(flow_state_flow_transitions)).to include('class CreateFlowStateFlowTransitions') + expect(File.read(flows_migration)).to include('class CreateFlowStateFlows') + expect(File.read(transitions_migration)).to include('class CreateFlowStateFlowTransitions') + expect(File.read(artefacts_migration)).to include('class CreateFlowStateTransitionArtefacts') end end From 9628206b0bf9e6c7d3a926bb10f27c094b81dcdc Mon Sep 17 00:00:00 2001 From: Chris Garrett Date: Tue, 22 Apr 2025 11:20:06 +0100 Subject: [PATCH 3/6] Add support for transition artefacts --- lib/flow_state/base.rb | 95 ++++++++++++++++------- lib/flow_state/flow_transition.rb | 2 +- lib/flow_state/transition_artefact.rb | 4 +- spec/flow_state_spec.rb | 104 ++++++++++---------------- spec/spec_helper.rb | 7 ++ 5 files changed, 119 insertions(+), 93 deletions(-) diff --git a/lib/flow_state/base.rb b/lib/flow_state/base.rb index b03a1e8..f49a995 100644 --- a/lib/flow_state/base.rb +++ b/lib/flow_state/base.rb @@ -2,10 +2,12 @@ module FlowState # Base Model to be extended by app flows - class Base < ActiveRecord::Base - class UnknownStateError < StandardError; end + class Base < ActiveRecord::Base # rubocop:disable Metrics/ClassLength + class UnknownStateError < StandardError; end class InvalidTransitionError < StandardError; end class PayloadValidationError < StandardError; end + class GuardFailedError < StandardError; end + class UnknownArtefactError < StandardError; end DEPRECATOR = ActiveSupport::Deprecation.new(FlowState::VERSION, 'FlowState') @@ -17,6 +19,8 @@ class PayloadValidationError < StandardError; end inverse_of: :flow, dependent: :destroy + has_many :flow_artefacts, through: :flow_transitions + class << self def state(name, error: false) name = name.to_sym @@ -24,15 +28,6 @@ def state(name, error: false) error_states << name if error end - def error_state(name) - DEPRECATOR.warn( - 'FlowState::Base.error_state is deprecated. ' \ - 'Use state(name, error: true) instead.' - ) - - state(name, error: true) - end - def initial_state(name = nil) name ? @initial_state = name.to_sym : @initial_state end @@ -42,6 +37,10 @@ def prop(name, type) define_method(name) { payload&.dig(name.to_s) } end + def persist(name, type) + artefact_schema[name.to_sym] = type + end + def all_states @all_states ||= [] end @@ -53,6 +52,10 @@ def error_states def payload_schema @payload_schema ||= {} end + + def artefact_schema + @artefact_schema ||= {} + end end validates :current_state, presence: true @@ -60,35 +63,75 @@ def payload_schema after_initialize { self.current_state ||= resolve_initial_state } - def transition!(from:, to:, after_transition: nil) # rubocop:disable Metrics/AbcSize,Metrics/MethodLength - from = Array(from).map(&:to_sym) - to = to.to_sym + # Public API: handles state change, guards, artefacts and callback + def transition!(from:, to:, guard: nil, persists: nil, after_transition: nil, &block) + setup_transition!(from, to, guard, persists, &block) + perform_transition!(to, persists) + after_transition&.call + end - ensure_known_states!(from + [to]) + def errored? + self.class.error_states.include?(current_state&.to_sym) + end - with_lock do - unless from.include?(current_state&.to_sym) - raise InvalidTransitionError, - "state #{current_state} not in #{from} (#{from.inspect}->#{to.inspect}" - end + private + # 1) validate inputs, run guard, capture artefact info + def setup_transition!(from, to, guard, persists, &block) + @from_states = Array(from).map(&:to_sym) + @to_state = to.to_sym + + ensure_known_states!(@from_states + [@to_state]) + run_guard!(guard) if guard + @artefact_name, @artefact_data = load_artefact(persists, &block) if persists + end + + # 2) inside DB lock + tx create transition, update state, persist artefact + def perform_transition!(to, persists) + with_lock do + ensure_valid_from_state!(@from_states, to) transaction do - flow_transitions.create!( + @tr = flow_transitions.create!( transitioned_from: current_state, transitioned_to: to ) update!(current_state: to) + persist_artefact! if persists end end + end - after_transition&.call + def run_guard!(guard) + raise GuardFailedError, "guard failed for #{@to_state}" unless instance_exec(&guard) end - def errored? - self.class.error_states.include?(current_state&.to_sym) + def load_artefact(persists) + name = persists.to_sym + schema = self.class.artefact_schema + raise UnknownArtefactError, "#{name} not declared" unless schema.key?(name) + + data = yield + [name, data] end - private + def ensure_valid_from_state!(from_states, to) + return if from_states.include?(current_state&.to_sym) + + raise InvalidTransitionError, + "state #{current_state} not in #{from_states.inspect} -> #{to.inspect}" + end + + def persist_artefact! + expected = self.class.artefact_schema[@artefact_name] + unless @artefact_data.is_a?(expected) + raise PayloadValidationError, "artefact #{@artefact_name} must be #{expected}" + end + + @tr.flow_artefacts.create!( + name: @artefact_name.to_s, + payload: @artefact_data + ) + end def resolve_initial_state init = self.class.initial_state || self.class.all_states.first @@ -107,7 +150,7 @@ def validate_payload schema.each do |key, klass| v = payload&.dig(key.to_s) - raise PayloadValidationError, "#{key} missing" if v.nil? + raise PayloadValidationError, "#{key} missing" unless v raise PayloadValidationError, "#{key} must be #{klass}" unless v.is_a?(klass) end rescue PayloadValidationError => e diff --git a/lib/flow_state/flow_transition.rb b/lib/flow_state/flow_transition.rb index e5fa586..e9a4d72 100644 --- a/lib/flow_state/flow_transition.rb +++ b/lib/flow_state/flow_transition.rb @@ -10,7 +10,7 @@ class FlowTransition < ActiveRecord::Base foreign_key: :flow_id, inverse_of: :flow_transitions - has_many :artefacts, + has_many :flow_artefacts, class_name: 'FlowState::TransitionArtefact', foreign_key: :transition_id, inverse_of: :transition, diff --git a/lib/flow_state/transition_artefact.rb b/lib/flow_state/transition_artefact.rb index aae5cbc..a1f95c8 100644 --- a/lib/flow_state/transition_artefact.rb +++ b/lib/flow_state/transition_artefact.rb @@ -3,11 +3,11 @@ module FlowState # Model for logging transition changes to class TransitionArtefact < ActiveRecord::Base - self.table_name = 'flow_state_flow_artefacts' + self.table_name = 'flow_state_transition_artefacts' belongs_to :transition, class_name: 'FlowState::FlowTransition', foreign_key: :transition_id, - inverse_of: :transition_artefacts + inverse_of: :flow_artefacts end end diff --git a/spec/flow_state_spec.rb b/spec/flow_state_spec.rb index ab01953..b904428 100644 --- a/spec/flow_state_spec.rb +++ b/spec/flow_state_spec.rb @@ -11,89 +11,65 @@ initial_state :draft prop :name, String + + persist :third_party_api_response, Hash end) end let!(:flow) { Flow.create!(payload: { name: 'Example' }) } - describe 'initialisation' do - it 'sets the configured initial state' do - expect(flow.current_state).to eq('draft') - end - end - - describe 'attribute getter' do - it 'reads from payload' do - expect(flow.name).to eq('Example') + describe 'artefact persistence' do # rubocop:disable Metrics/BlockLength + it 'raises if you pass an unknown persists name' do + expect do + flow.transition!(from: :draft, to: :review, persists: :nope) { {} } + end.to raise_error(FlowState::Base::UnknownArtefactError) end - end - describe '#transition!' do - it 'updates the state and records a FlowTransition row' do + it 'raises if the block returns wrong type' do expect do - flow.transition!(from: :draft, to: :review) - end.to change { flow.flow_transitions.count }.by(1) - - expect(flow.reload.current_state).to eq('review') + flow.transition!(from: :draft, to: :review, persists: :third_party_api_response) { 'not a hash' } + end.to raise_error(FlowState::Base::PayloadValidationError, /must be Hash/) end - it 'runs the after_transition callback' do + it 'saves an artefact record before after_transition' do flag = false flow.transition!( from: :draft, to: :review, - after_transition: -> { flag = true } - ) - - expect(flag).to eq(true) - end - - it 'raises InvalidTransitionError when current state not in :from list' do - expect do - flow.transition!(from: :review, to: :draft) - end.to raise_error(FlowState::Base::InvalidTransitionError) - end - - it 'raises UnknownStateError when :to is unknown' do - expect do - flow.transition!(from: :draft, to: :bogus) - end.to raise_error(FlowState::Base::UnknownStateError) + persists: :third_party_api_response, + after_transition: lambda { + expect(flow.flow_transitions.last + .flow_artefacts + .find_by(name: 'third_party_api_response')).to be_present + flag = true + } + ) { { foo: 'bar' } } + + expect(flag).to be true + artefact = flow.flow_transitions.last.flow_artefacts.last + expect(artefact.name).to eq('third_party_api_response') + expect(artefact.payload).to eq('foo' => 'bar') end end - describe '#errored?' do - it 'is true when current_state is an error state' do - flow.update!(current_state: :failed) - expect(flow).to be_errored - end - end - - describe 'payload validation' do - it 'is invalid when a key is missing' do - flow = Flow.new - expect(flow).to be_invalid - expect(flow.errors[:payload]).to include('name missing') - end - - it 'is invalid when key type is wrong' do - flow = Flow.new(payload: { name: 1 }) - expect(flow).to be_invalid - expect(flow.errors[:payload]).to include('name must be String') + describe 'guards' do + it 'raises if guard block returns false' do + expect do + flow.transition!( + from: :draft, + to: :review, + guard: -> { false } + ) + end.to raise_error(FlowState::Base::GuardFailedError) end - end - describe 'initial state validation' do - it 'raises UnknownStateError if initial_state is not declared' do - stub_const('BadFlow', Class.new(FlowState::Base) do - self.table_name = 'flow_state_flows' - - state :only_state - initial_state :ghost - end) - - expect do - BadFlow.create! - end.to raise_error(FlowState::Base::UnknownStateError) + it 'allows transition when guard is true' do + flow.transition!( + from: :draft, + to: :review, + guard: -> { true } + ) + expect(flow.current_state).to eq('review') end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index d2217a1..bb96ca0 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -26,6 +26,13 @@ t.string :transitioned_to, null: false t.timestamps end + + create_table :flow_state_transition_artefacts do |t| + t.references :transition, null: false, foreign_key: { to_table: :flow_state_flow_transitions } + t.string :name, null: false + t.json :payload + t.timestamps + end end RSpec.configure do |config| From 301636cc7a986d357ce014d181c6bd4e67d0d99f Mon Sep 17 00:00:00 2001 From: Chris Garrett Date: Tue, 22 Apr 2025 11:48:50 +0100 Subject: [PATCH 4/6] Update README --- README.md | 126 +++++++++++++++++++++++------------------ lib/flow_state/base.rb | 5 +- 2 files changed, 73 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index 0b29fb8..9622617 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # FlowState -> **Model workflows without magic.** +> **Model workflows cleanly and explicitly.** --- @@ -46,91 +46,103 @@ bin/rails db:migrate --- -## Example: Syncing song data with Soundcharts +## Example: Saving a third party API response to local database Suppose you want to build a workflow that: -- Gets song metadata from Soundcharts -- Then fetches audience data -- Tracks each step and handles retries on failure +- Fetches a response from a third party API +- Then saves it to each database +- As separate jobs, tracking each step and permitting retries on failure +- While avoiding race conditions --- ### Define your Flow ```ruby -class SyncSoundchartsFlow < FlowState::Base - prop :song_id, String +class SyncThirdPartApiFlow < FlowState::Base + prop :my_record_id, String + prop :third_party_id, String state :pending state :picked - state :syncing_song_metadata - state :synced_song_metadata - state :syncing_audience_data - state :synced_audience_data + state :fetching_third_party_api + state :fetched_third_party_api + state :failed_to_fetch_third_party_api, error: true + state :saving_my_record + state :saved_my_record + state :failed_to_save_my_record, error: true state :completed - state :failed_to_sync_song_metadata, error: true - state :failed_to_sync_audience_data, error: true + persist :third_party_api_response initial_state :pending def pick! transition!( - from: %i[pending completed failed_to_sync_song_metadata failed_to_sync_audience_data], + from: %i[pending], to: :picked, - after_transition: -> { sync_song_metadata } + after_transition: -> { enqueue_fetch } ) end - def start_song_metadata_sync! - transition!(from: %i[picked failed_to_sync_song_metadata], to: :syncing_song_metadata) - end - - def finish_song_metadata_sync! + def start_third_party_api_request! transition!( - from: :syncing_song_metadata, to: :synced_song_metadata, - after_transition: -> { sync_audience_data } + from: %i[picked failed_to_fetch_third_party_api], + to: :fetching_third_party_api ) end - def fail_song_metadata_sync! - transition!(from: :syncing_song_metadata, to: :failed_to_sync_song_metadata) + def finish_third_party_api_request!(result) + transition!( + from: :fetching_third_party_api, + to: :fetched_third_party_api, + persists: :third_party_api_response, + after_transition: -> { enqueue_save } + ) { result } end - def start_audience_data_sync! + def fail_third_party_api_request! + transition!( + from: :fetching_third_party_api, + to: :failed_to_fetch_third_party_api + ) + end + + def start_record_save! transition!( - from: %i[synced_song_metadata failed_to_sync_audience_data], - to: :syncing_audience_data + from: %i[fetched_third_party_api failed_to_save_my_record], + to: :saving_my_record, + guard: -> { flow_artefacts.where(name: 'third_party_api_response').exists? } ) end - def finish_audience_data_sync! + def finish_record_save! transition!( - from: :syncing_audience_data, to: :synced_audience_data, + from: :saving_my_record, + to: :saved_my_record, after_transition: -> { complete! } ) end - def fail_audience_data_sync! - transition!(from: :syncing_audience_data, to: :failed_to_sync_audience_data) + def fail_record_save! + transition!( + from: :saving_my_record, + to: :failed_to_save_my_record + ) end def complete! - transition!(from: :synced_audience_data, to: :completed, after_transition: -> { destroy }) + transition!(from: :saved_my_record, to: :completed, after_transition: -> { destroy }) end private - def song - @song ||= Song.find(song_id) - end - - def sync_song_metadata - SyncSoundchartsSongJob.perform_later(flow_id: id) + def enqueue_fetch + FetchThirdPartyJob.perform_later(flow_id: id) end - def sync_audience_data - SyncSoundchartsAudienceJob.perform_later(flow_id: id) + def enqueue_save + SaveLocalRecordJob.perform_later(flow_id: id) end end ``` @@ -143,54 +155,60 @@ Each job moves the flow through the correct states, step-by-step. --- -**Sync song metadata** +**Fetch Third Party API Response** ```ruby -class SyncSoundchartsSongJob < ApplicationJob +class FetchThirdPartyJob < ApplicationJob def perform(flow_id:) @flow_id = flow_id - flow.start_song_metadata_sync! + flow.start_third_party_api_request! - # Fetch song metadata from Soundcharts etc + response = ThirdPartyApiRequest.new.to_h - flow.finish_song_metadata_sync! + flow.finish_third_party_api_request!(response) rescue - flow.fail_song_metadata_sync! + flow.fail_third_party_api_request! raise end private def flow - @flow ||= SyncSoundchartsFlow.find(@flow_id) + @flow ||= SyncThirdPartApiFlow.find(@flow_id) end end ``` --- -**Sync audience data** +**Save Result to Local Database** ```ruby -class SyncSoundchartsAudienceJob < ApplicationJob +class SaveLocalRecordJob < ApplicationJob def perform(flow_id:) @flow_id = flow_id - flow.start_audience_data_sync! + flow.start_record_save! - # Fetch audience data from Soundcharts etc + MyRecord.create!(payload) - flow.finish_audience_data_sync! + flow.finish_record_save! rescue - flow.fail_audience_data_sync! + flow.fail_record_save! raise end private def flow - @flow ||= SyncSoundchartsFlow.find(@flow_id) + @flow ||= SyncThirdPartApiFlow.find(@flow_id) + end + + def payload + @payload ||= flow.flow_artefacts.find_by( + name: :third_party_api_response + )&.payload end end ``` diff --git a/lib/flow_state/base.rb b/lib/flow_state/base.rb index f49a995..b2ab3b4 100644 --- a/lib/flow_state/base.rb +++ b/lib/flow_state/base.rb @@ -63,7 +63,6 @@ def artefact_schema after_initialize { self.current_state ||= resolve_initial_state } - # Public API: handles state change, guards, artefacts and callback def transition!(from:, to:, guard: nil, persists: nil, after_transition: nil, &block) setup_transition!(from, to, guard, persists, &block) perform_transition!(to, persists) @@ -76,7 +75,6 @@ def errored? private - # 1) validate inputs, run guard, capture artefact info def setup_transition!(from, to, guard, persists, &block) @from_states = Array(from).map(&:to_sym) @to_state = to.to_sym @@ -86,8 +84,7 @@ def setup_transition!(from, to, guard, persists, &block) @artefact_name, @artefact_data = load_artefact(persists, &block) if persists end - # 2) inside DB lock + tx create transition, update state, persist artefact - def perform_transition!(to, persists) + def perform_transition!(to, persists) # rubocop:disable Metrics/MethodLength with_lock do ensure_valid_from_state!(@from_states, to) transaction do From af88fd49cdeb24763f7c99df6f47613c03f9d6ff Mon Sep 17 00:00:00 2001 From: Chris Garrett Date: Tue, 22 Apr 2025 12:00:57 +0100 Subject: [PATCH 5/6] Expand README example --- README.md | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 9622617..de3c510 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,19 @@ Each job moves the flow through the correct states, step-by-step. --- +**Create and start the flow** + +```ruby +flow = SyncThirdPartApiFlow.create( + my_record_id: "my_local_record_id", + third_party_id: "some_service_id" +) + +flow.pick! +``` + +--- + **Fetch Third Party API Response** ```ruby @@ -164,7 +177,7 @@ class FetchThirdPartyJob < ApplicationJob flow.start_third_party_api_request! - response = ThirdPartyApiRequest.new.to_h + response = ThirdPartyApiRequest.new(id: flow.third_party_id).to_h flow.finish_third_party_api_request!(response) rescue @@ -191,7 +204,7 @@ class SaveLocalRecordJob < ApplicationJob flow.start_record_save! - MyRecord.create!(payload) + record.update!(payload: third_party_payload) flow.finish_record_save! rescue @@ -205,10 +218,14 @@ class SaveLocalRecordJob < ApplicationJob @flow ||= SyncThirdPartApiFlow.find(@flow_id) end - def payload - @payload ||= flow.flow_artefacts.find_by( - name: :third_party_api_response - )&.payload + def third_party_payload + flow.flow_artefacts + .find_by!(name: 'third_party_api_response') + .payload + end + + def record + @record ||= MyRecord.find(flow.my_record_id) end end ``` From 4231af94eb2ca340065222afef92930928dad02c Mon Sep 17 00:00:00 2001 From: Chris Garrett Date: Tue, 22 Apr 2025 12:10:08 +0100 Subject: [PATCH 6/6] More readme tweaks --- README.md | 55 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index de3c510..c340aff 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,34 @@ # FlowState -> **Model workflows cleanly and explicitly.** +> **Model workflows cleanly, explicitly, and with durable persistence between steps.** --- -**FlowState** provides a clean, Rails-native way to model **stepped workflows** as explicit, durable state machines. -It lets you define each step, move between states deliberately, and track execution — without relying on metaprogramming, `method_missing`, or hidden magic. +**FlowState** provides a clean, Rails-native way to model **stepped workflows** as explicit, durable workflows, with support for persisting arbitrary artefacts between transitions. It lets you define each step, move between states safely, track execution history, and persist payloads ("artefacts") in a type-safe way — without using metaprogramming, `method_missing`, or other hidden magic. -Every workflow instance is persisted to the database. -Every transition is logged. -Every change happens through clear, intention-revealing methods you define yourself. +Perfect for workflows that rely on third party resources and integrations. +Every workflow instance, transition and artefact is persisted to the database. +Every change happens through clear, intention-revealing methods that you define yourself. Built for real-world systems where you need to: - Track complex, multi-step processes -- Handle failures gracefully -- Persist state safely across asynchronous jobs +- Handle failures gracefully with error states and retries +- Persist state and interim data across asynchronous jobs +- Store and type-check arbitrary payloads (artefacts) between steps +- Avoid race conditions via database locks and explicit guards --- ## Key Features -- **Explicit transitions** — Every state change is triggered manually via a method you define. -- **Full execution history** — Every transition is recorded with timestamps and a history table. -- **Error recovery** — Model and track failures directly with error states. -- **Typed payloads** — Strongly-typed metadata attached to every workflow. -- **Persistence-first** — Workflow state is stored in your database, not memory. -- **No Magic** — No metaprogramming, no dynamic method generation, no `method_missing` tricks. +- **Explicit transitions** — Every state change is triggered manually via a method you define. +- **Full execution history** — Every transition is recorded with timestamps and a history table. +- **Error recovery** — Model and track failures directly with error states. +- **Typed payloads** — Strongly-typed metadata attached to every workflow. +- **Artefact persistence** — Declare named and typed artefacts to persist between specific transitions. +- **Guard clauses** — Protect transitions with guards that raise if conditions aren’t met. +- **Persistence-first** — Workflow state and payloads are stored in your database, not memory. +- **No Magic** — No metaprogramming, no dynamic method generation, no `method_missing` tricks. --- @@ -50,16 +53,18 @@ bin/rails db:migrate Suppose you want to build a workflow that: - Fetches a response from a third party API -- Then saves it to each database -- As separate jobs, tracking each step and permitting retries on failure -- While avoiding race conditions +- Allows for retrying the fetch on failure +- And persists the response to the workflow +- Then saves the persisted response to the database +- As two separate, encapsulated jobs +- Tracking each step, while protecting against race conditions --- ### Define your Flow ```ruby -class SyncThirdPartApiFlow < FlowState::Base +class SyncThirdPartyApiFlow < FlowState::Base prop :my_record_id, String prop :third_party_id, String @@ -110,8 +115,8 @@ class SyncThirdPartApiFlow < FlowState::Base def start_record_save! transition!( - from: %i[fetched_third_party_api failed_to_save_my_record], - to: :saving_my_record, + from: %i[fetched_third_party_api failed_to_save_my_record], + to: :saving_my_record, guard: -> { flow_artefacts.where(name: 'third_party_api_response').exists? } ) end @@ -158,7 +163,7 @@ Each job moves the flow through the correct states, step-by-step. **Create and start the flow** ```ruby -flow = SyncThirdPartApiFlow.create( +flow = SyncThirdPartyApiFlow.create( my_record_id: "my_local_record_id", third_party_id: "some_service_id" ) @@ -172,6 +177,10 @@ flow.pick! ```ruby class FetchThirdPartyJob < ApplicationJob + retry_on StandardError, + wait: ->(executions) { 10.seconds * (2**executions) }, + attempts: 3 + def perform(flow_id:) @flow_id = flow_id @@ -188,7 +197,7 @@ class FetchThirdPartyJob < ApplicationJob private def flow - @flow ||= SyncThirdPartApiFlow.find(@flow_id) + @flow ||= SyncThirdPartyApiFlow.find(@flow_id) end end ``` @@ -215,7 +224,7 @@ class SaveLocalRecordJob < ApplicationJob private def flow - @flow ||= SyncThirdPartApiFlow.find(@flow_id) + @flow ||= SyncThirdPartyApiFlow.find(@flow_id) end def third_party_payload