From 24b124565912788ff20c3e6f6fca19c41902907c Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Mon, 25 Jul 2022 16:23:06 -0700 Subject: [PATCH 01/12] adding support for queries --- examples/bin/query | 14 +++++ examples/bin/worker | 1 + .../spec/integration/query_workflow_spec.rb | 55 +++++++++++++++++++ examples/workflows/query_workflow.rb | 36 ++++++++++++ 4 files changed, 106 insertions(+) create mode 100644 examples/bin/query create mode 100644 examples/spec/integration/query_workflow_spec.rb create mode 100644 examples/workflows/query_workflow.rb diff --git a/examples/bin/query b/examples/bin/query new file mode 100644 index 00000000..dbe95b1d --- /dev/null +++ b/examples/bin/query @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby +require_relative '../init' + +Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f } + +workflow_class_name, workflow_id, run_id, query, args = ARGV +workflow_class = Object.const_get(workflow_class_name) + +if ![workflow_class, workflow_id, run_id, query].all? + fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`' +end + +result = Cadence.query_workflow(workflow_class, query, workflow_id, run_id, args) +puts result.inspect \ No newline at end of file diff --git a/examples/bin/worker b/examples/bin/worker index 2d6735fe..4e3386c0 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -28,6 +28,7 @@ worker.register_workflow(SideEffectWorkflow) worker.register_workflow(SimpleTimerWorkflow) worker.register_workflow(TimeoutWorkflow) worker.register_workflow(TripBookingWorkflow) +worker.register_workflow(QueryWorkflow) worker.register_activity(AsyncActivity) worker.register_activity(EchoActivity) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb new file mode 100644 index 00000000..89c10e2c --- /dev/null +++ b/examples/spec/integration/query_workflow_spec.rb @@ -0,0 +1,55 @@ +require 'examples/workflows/query_workflow' +require 'cadence/errors' + + +describe QueryWorkflow, :integration do + subject { described_class } + + it 'returns the correct result for the queries' do + workflow_id, run_id = run_workflow(described_class) + + # Query with nil workflow class + expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id)) + .to eq 'started' + + # Query with arbitrary args + expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id, + 'upcase', 'ignored', 'reverse')) + .to eq 'DETRATS' + + # Query with no args + expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id)) + .to eq 0 + + # Query with unregistered handler + expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } + .to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query') + + Cadence.signal_workflow(described_class, 'make_progress', workflow_id, run_id) + + # Query for updated signal_count with an unsatisfied reject condition + expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open)) + .to eq 1 + + Cadence.signal_workflow(described_class, 'finish', workflow_id, run_id) + wait_for_workflow_completion(workflow_id, run_id) + + # Repeating original query scenarios above, expecting updated state and signal results + expect(Cadence.query_workflow(nil, 'state', workflow_id, run_id)) + .to eq 'finished' + + expect(Cadence.query_workflow(described_class, 'state', workflow_id, run_id, + 'upcase', 'ignored', 'reverse')) + .to eq 'DEHSINIF' + + expect(Cadence.query_workflow(described_class, 'signal_count', workflow_id, run_id)) + .to eq 2 + + expect { Cadence.query_workflow(described_class, 'unknown_query', workflow_id, run_id) } + .to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for unknown_query') + + # Now that the workflow is completed, test a query with a reject condition satisfied + expect { Cadence.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) } + .to raise_error(Cadence::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED') + end +end \ No newline at end of file diff --git a/examples/workflows/query_workflow.rb b/examples/workflows/query_workflow.rb new file mode 100644 index 00000000..74759250 --- /dev/null +++ b/examples/workflows/query_workflow.rb @@ -0,0 +1,36 @@ +class QueryWorkflow < Cadence::Workflow + attr_reader :state, :signal_count, :last_signal_received + + def execute + @state = "started" + @signal_count = 0 + @last_signal_received = nil + + workflow.on_query("state") { |*args| apply_transforms(state, args) } + workflow.on_query("signal_count") { signal_count } + + workflow.on_signal do |signal| + @signal_count += 1 + @last_signal_received = signal + end + + workflow.wait_for { last_signal_received == "finish" } + @state = "finished" + + { + signal_count: signal_count, + last_signal_received: last_signal_received, + final_state: state + } + end + + private + + def apply_transforms(value, transforms) + return value if value.nil? || transforms.empty? + transforms.inject(value) do |memo, input| + next memo unless memo.respond_to?(input) + memo.public_send(input) + end + end +end \ No newline at end of file From ab16ce00797898aa937f44fcc112c2c1e18eaafa Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Mon, 25 Jul 2022 16:25:49 -0700 Subject: [PATCH 02/12] added relevant files for client query --- lib/cadence.rb | 1 + lib/cadence/client.rb | 11 ++++ lib/cadence/connection/thrift.rb | 57 +++++++++++++++---- lib/cadence/errors.rb | 5 ++ lib/cadence/workflow/context.rb | 9 ++- lib/cadence/workflow/executor.rb | 18 +++++- lib/cadence/workflow/query_registry.rb | 33 +++++++++++ lib/cadence/workflow/query_result.rb | 16 ++++++ .../unit/lib/cadence/workflow/context_spec.rb | 3 +- .../lib/cadence/workflow/executor_spec.rb | 2 + .../lib/cadence/workflow/query_answer_spec.rb | 0 .../lib/cadence/workflow/query_result_spec.rb | 25 ++++++++ 12 files changed, 165 insertions(+), 15 deletions(-) create mode 100644 lib/cadence/workflow/query_registry.rb create mode 100644 lib/cadence/workflow/query_result.rb create mode 100644 spec/unit/lib/cadence/workflow/query_answer_spec.rb create mode 100644 spec/unit/lib/cadence/workflow/query_result_spec.rb diff --git a/lib/cadence.rb b/lib/cadence.rb index 6c1f4c92..2bbaea22 100644 --- a/lib/cadence.rb +++ b/lib/cadence.rb @@ -12,6 +12,7 @@ module Cadence :schedule_workflow, :register_domain, :signal_workflow, + :query_workflow, :reset_workflow, :terminate_workflow, :fetch_workflow_execution_info, diff --git a/lib/cadence/client.rb b/lib/cadence/client.rb index fdb248fe..0979e7a1 100644 --- a/lib/cadence/client.rb +++ b/lib/cadence/client.rb @@ -75,6 +75,17 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil) ) end + def query_workflow(workflow, query, workflow_id, run_id, *args, domain: nil, query_reject_condition: nil) + connection.query_workflow( + domain: domain || workflow.domain, + workflow_id: workflow_id, + run_id: run_id, + query: query, + args: args, + query_reject_condition: query_reject_condition + ) + end + def reset_workflow(domain, workflow_id, run_id, strategy: nil, decision_task_id: nil, reason: 'manual reset') # Pick default strategy for backwards-compatibility strategy ||= :last_decision_task unless decision_task_id diff --git a/lib/cadence/connection/thrift.rb b/lib/cadence/connection/thrift.rb index bbdf2a9a..b87192e7 100644 --- a/lib/cadence/connection/thrift.rb +++ b/lib/cadence/connection/thrift.rb @@ -14,15 +14,21 @@ class Thrift reject: CadenceThrift::WorkflowIdReusePolicy::RejectDuplicate }.freeze + QUERY_REJECT_CONDITION = { + # none: CadenceThrift::QueryRejectCondition::NONE, + not_open: CadenceThrift::QueryRejectCondition::NOT_OPEN, + not_completed_cleanly: CadenceThrift::QueryRejectCondition::NOT_COMPLETED_CLEANLY + }.freeze + DEFAULT_OPTIONS = { polling_ttl: 60, # 1 minute max_page_size: 100 }.freeze HISTORY_EVENT_FILTER = { - all: CadenceThrift::HistoryEventFilterType::ALL_EVENT, - close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT, - }.freeze + all: CadenceThrift::HistoryEventFilterType::ALL_EVENT, + close: CadenceThrift::HistoryEventFilterType::CLOSE_EVENT, + }.freeze def initialize(host, port, identity, options = {}) @url = "http://#{host}:#{port}" @@ -345,8 +351,39 @@ def reset_sticky_task_list raise NotImplementedError end - def query_workflow - raise NotImplementedError + def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil) + request = CadenceThrift::QueryWorkflowRequest.new( + domain: domain, + execution: CadenceThrift::WorkflowExecution.new( + workflow_id: workflow_id, + run_id: run_id + ), + query: CadenceThrift::WorkflowQuery.new( + query_type: query, + query_args: args.to_json + ) + ) + if query_reject_condition + condition = QUERY_REJECT_CONDITION[query_reject_condition] + raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition + + request.query_reject_condition = condition + end + + begin + response = client.query_workflow(request) + rescue ::GRPC::InvalidArgument => e + raise Cadence::QueryFailed, e.details + end + + if response.query_rejected + rejection_status = response.query_rejected.status || 'not specified by server' + raise Cadence::QueryFailed, "Query rejected: status #{rejection_status}" + elsif !response.query_result + raise Cadence::QueryFailed, 'Invalid response from server' + else + response.query_result.from_json + end end def describe_workflow_execution(domain:, workflow_id:, run_id:) @@ -389,9 +426,9 @@ def transport def connection @connection ||= begin - protocol = ::Thrift::BinaryProtocol.new(transport) - CadenceThrift::WorkflowService::Client.new(protocol) - end + protocol = ::Thrift::BinaryProtocol.new(transport) + CadenceThrift::WorkflowService::Client.new(protocol) + end end def send_request(name, request) @@ -413,7 +450,7 @@ def serialize_time_filter(from, to) CadenceThrift::StartTimeFilter.new( earliestTime: Cadence::Utils.time_to_nanos(from).to_i, latestTime: Cadence::Utils.time_to_nanos(to).to_i, - ) + ) end def serialize_execution_filter(value) @@ -435,4 +472,4 @@ def serialize_status_filter(value) end end end -end +end \ No newline at end of file diff --git a/lib/cadence/errors.rb b/lib/cadence/errors.rb index 28fc4734..fb01ff91 100644 --- a/lib/cadence/errors.rb +++ b/lib/cadence/errors.rb @@ -18,4 +18,9 @@ class TimeoutError < ClientError; end # A superclass for activity exceptions raised explicitly # with the itent to propagate to a workflow class ActivityException < ClientError; end + + class ApiError < Error; end + + class QueryFailed < ApiError; end + end diff --git a/lib/cadence/workflow/context.rb b/lib/cadence/workflow/context.rb index 9c852f7a..a130263e 100644 --- a/lib/cadence/workflow/context.rb +++ b/lib/cadence/workflow/context.rb @@ -17,9 +17,10 @@ class Workflow class Context attr_reader :metadata - def initialize(state_manager, dispatcher, metadata, config) + def initialize(state_manager, dispatcher, metadata, config, query_registry) @state_manager = state_manager @dispatcher = dispatcher + @query_registry = query_registry @metadata = metadata @config = config end @@ -227,6 +228,10 @@ def on_signal(&block) end end + def on_query(query, &block) + query_registry.register(query, &block) + end + def cancel_activity(activity_id) decision = Decision::RequestActivityCancellation.new(activity_id: activity_id) @@ -246,7 +251,7 @@ def cancel(target, cancelation_id) private - attr_reader :state_manager, :dispatcher, :config + attr_reader :state_manager, :dispatcher, :config, :query_registry def schedule_decision(decision) state_manager.schedule(decision) diff --git a/lib/cadence/workflow/executor.rb b/lib/cadence/workflow/executor.rb index daf5b77f..51f294ea 100644 --- a/lib/cadence/workflow/executor.rb +++ b/lib/cadence/workflow/executor.rb @@ -1,6 +1,7 @@ require 'fiber' require 'cadence/workflow/dispatcher' +require 'cadence/workflow/query_registry' require 'cadence/workflow/state_manager' require 'cadence/workflow/context' require 'cadence/workflow/history/event_target' @@ -12,6 +13,7 @@ class Executor def initialize(workflow_class, history, metadata, config) @workflow_class = workflow_class @dispatcher = Dispatcher.new + @query_registry = QueryRegistry.new @state_manager = StateManager.new(dispatcher) @metadata = metadata @history = history @@ -32,13 +34,25 @@ def run return state_manager.decisions end + # Process queries using the pre-registered query handlers + # + # @note this method is expected to be executed after the history has + # been fully replayed (by invoking the #run method) + # + # @param queries [Hash] + # + # @return [Hash] + def process_queries(queries = {}) + queries.transform_values(&method(:process_query)) + end + private - attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config + attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config, :query_registry def execute_workflow(input, workflow_started_event_attributes) metadata = generate_workflow_metadata_from(workflow_started_event_attributes) - context = Workflow::Context.new(state_manager, dispatcher, metadata, config) + context = Workflow::Context.new(state_manager, dispatcher, metadata, config, query_registry) Fiber.new do workflow_class.execute_in_context(context, input) diff --git a/lib/cadence/workflow/query_registry.rb b/lib/cadence/workflow/query_registry.rb new file mode 100644 index 00000000..e9c655fb --- /dev/null +++ b/lib/cadence/workflow/query_registry.rb @@ -0,0 +1,33 @@ +require 'cadence/errors' + +module Cadence + class Workflow + class QueryRegistry + def initialize + @handlers = {} + end + + def register(type, &handler) + if handlers.key?(type) + warn "[NOTICE] Overwriting a query handler for #{type}" + end + + handlers[type] = handler + end + + def handle(type, args = nil) + handler = handlers[type] + + unless handler + raise Cadence::QueryFailed, "Workflow did not register a handler for #{type}" + end + + handler.call(*args) + end + + private + + attr_reader :handlers + end + end +end diff --git a/lib/cadence/workflow/query_result.rb b/lib/cadence/workflow/query_result.rb new file mode 100644 index 00000000..ab119f51 --- /dev/null +++ b/lib/cadence/workflow/query_result.rb @@ -0,0 +1,16 @@ +module Cadence + class Workflow + module QueryResult + Answer = Struct.new(:result) + Failure = Struct.new(:error) + + def self.answer(result) + Answer.new(result).freeze + end + + def self.failure(error) + Failure.new(error).freeze + end + end + end +end \ No newline at end of file diff --git a/spec/unit/lib/cadence/workflow/context_spec.rb b/spec/unit/lib/cadence/workflow/context_spec.rb index 62a7d566..ef17f7c9 100644 --- a/spec/unit/lib/cadence/workflow/context_spec.rb +++ b/spec/unit/lib/cadence/workflow/context_spec.rb @@ -4,7 +4,7 @@ require 'cadence/configuration' describe Cadence::Workflow::Context do - subject { described_class.new(state_manager, dispatcher, metadata, config) } + subject { described_class.new(state_manager, dispatcher, metadata, config, query_registry) } let(:state_manager) { instance_double('Cadence::Workflow::StateManager') } let(:dispatcher) { Cadence::Workflow::Dispatcher.new } @@ -21,6 +21,7 @@ end let(:metadata) { Cadence::Metadata::Workflow.new(metadata_hash) } let(:config) { Cadence::Configuration.new } + let(:query_registry) { instance_double('Cadence::Workflow::QueryRegistry') } describe '#headers' do it 'returns metadata headers' do diff --git a/spec/unit/lib/cadence/workflow/executor_spec.rb b/spec/unit/lib/cadence/workflow/executor_spec.rb index 3612e037..e4e6cd6d 100644 --- a/spec/unit/lib/cadence/workflow/executor_spec.rb +++ b/spec/unit/lib/cadence/workflow/executor_spec.rb @@ -74,3 +74,5 @@ def execute end end end + + diff --git a/spec/unit/lib/cadence/workflow/query_answer_spec.rb b/spec/unit/lib/cadence/workflow/query_answer_spec.rb new file mode 100644 index 00000000..e69de29b diff --git a/spec/unit/lib/cadence/workflow/query_result_spec.rb b/spec/unit/lib/cadence/workflow/query_result_spec.rb new file mode 100644 index 00000000..4c5704f3 --- /dev/null +++ b/spec/unit/lib/cadence/workflow/query_result_spec.rb @@ -0,0 +1,25 @@ +require 'cadence/workflow/query_result' + +describe Cadence::Workflow::QueryResult do + describe '.answer' do + it 'returns an answer query result' do + result = described_class.answer(42) + + expect(result).to be_a(Cadence::Workflow::QueryResult::Answer) + expect(result).to be_frozen + expect(result.result).to eq(42) + end + end + + describe '.failure' do + let(:error) { StandardError.new('Test query failure') } + + it 'returns a failure query result' do + result = described_class.failure(error) + + expect(result).to be_a(Cadence::Workflow::QueryResult::Failure) + expect(result).to be_frozen + expect(result.error).to eq(error) + end + end +end \ No newline at end of file From 6ad5e00be24b254934dd496c0eb7cc5e88b99dcd Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Tue, 26 Jul 2022 09:29:14 -0700 Subject: [PATCH 03/12] modified executor.rb, added tests for query_registry.rb --- lib/cadence/workflow/executor.rb | 8 +++++ .../lib/cadence/workflow/query_answer_spec.rb | 0 .../cadence/workflow/query_registry_spec.rb | 33 +++++++++++++++++++ 3 files changed, 41 insertions(+) delete mode 100644 spec/unit/lib/cadence/workflow/query_answer_spec.rb create mode 100644 spec/unit/lib/cadence/workflow/query_registry_spec.rb diff --git a/lib/cadence/workflow/executor.rb b/lib/cadence/workflow/executor.rb index 51f294ea..e628bbb0 100644 --- a/lib/cadence/workflow/executor.rb +++ b/lib/cadence/workflow/executor.rb @@ -50,6 +50,14 @@ def process_queries(queries = {}) attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config, :query_registry + def process_query(query) + result = query_registry.handle(query.query_type, query.query_args) + + QueryResult.answer(result) + rescue StandardError => error + QueryResult.failure(error) + end + def execute_workflow(input, workflow_started_event_attributes) metadata = generate_workflow_metadata_from(workflow_started_event_attributes) context = Workflow::Context.new(state_manager, dispatcher, metadata, config, query_registry) diff --git a/spec/unit/lib/cadence/workflow/query_answer_spec.rb b/spec/unit/lib/cadence/workflow/query_answer_spec.rb deleted file mode 100644 index e69de29b..00000000 diff --git a/spec/unit/lib/cadence/workflow/query_registry_spec.rb b/spec/unit/lib/cadence/workflow/query_registry_spec.rb new file mode 100644 index 00000000..07950642 --- /dev/null +++ b/spec/unit/lib/cadence/workflow/query_registry_spec.rb @@ -0,0 +1,33 @@ +require 'cadence/errors' + +module Cadence + class Workflow + class QueryRegistry + def initialize + @handlers = {} + end + + def register(type, &handler) + if handlers.key?(type) + warn "[NOTICE] Overwriting a query handler for #{type}" + end + + handlers[type] = handler + end + + def handle(type, args = nil) + handler = handlers[type] + + unless handler + raise Cadence::QueryFailed, "Workflow did not register a handler for #{type}" + end + + handler.call(*args) + end + + private + + attr_reader :handlers + end + end +end \ No newline at end of file From 739f43fd26bb722d1025510b7e31e2ef3a2270b7 Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Tue, 26 Jul 2022 15:00:49 -0700 Subject: [PATCH 04/12] implemented changes in thrift.rb, synced changes from task_processor.rb --- lib/cadence/connection/thrift.rb | 19 ++++-- .../workflow/decision_task_processor.rb | 59 +++++++++++++++++-- .../thrift/workflow_query_fabricator.rb | 6 ++ .../workflow/decision_task_processor_spec.rb | 4 +- 4 files changed, 76 insertions(+), 12 deletions(-) create mode 100644 spec/fabricators/thrift/workflow_query_fabricator.rb diff --git a/lib/cadence/connection/thrift.rb b/lib/cadence/connection/thrift.rb index b87192e7..8106c4d2 100644 --- a/lib/cadence/connection/thrift.rb +++ b/lib/cadence/connection/thrift.rb @@ -343,8 +343,17 @@ def get_search_attributes raise NotImplementedError end - def respond_query_task_completed - raise NotImplementedError + def respond_query_task_completed(namespace:, task_token:, query_result:) + query_result_proto = Serializer.serialize(query_result) + request = CadenceThrift::RespondQueryTaskCompletedRequest.new( + task_token: task_token, + namespace: namespace, + completed_type: query_result_proto.result_type, + query_result: query_result_proto.answer, + error_message: query_result_proto.error_message, + ) + + client.respond_query_task_completed(request) end def reset_sticky_task_list @@ -360,7 +369,7 @@ def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reje ), query: CadenceThrift::WorkflowQuery.new( query_type: query, - query_args: args.to_json + query_args: JSON.serialize(args) ) ) if query_reject_condition @@ -372,7 +381,7 @@ def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reje begin response = client.query_workflow(request) - rescue ::GRPC::InvalidArgument => e + rescue Cadence::InvalidArgument => e raise Cadence::QueryFailed, e.details end @@ -382,7 +391,7 @@ def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reje elsif !response.query_result raise Cadence::QueryFailed, 'Invalid response from server' else - response.query_result.from_json + JSON.deserialize(response.query_result) end end diff --git a/lib/cadence/workflow/decision_task_processor.rb b/lib/cadence/workflow/decision_task_processor.rb index 1781cc41..10b76444 100644 --- a/lib/cadence/workflow/decision_task_processor.rb +++ b/lib/cadence/workflow/decision_task_processor.rb @@ -7,7 +7,19 @@ module Cadence class Workflow class DecisionTaskProcessor + Query = Struct.new(:query) do + + def query_type + query.query_type + end + + def query_args + JSON.deserialize(query.query_args) + end + end + MAX_FAILED_ATTEMPTS = 50 + LEGACY_QUERY_KEY = :legacy_query def initialize(task, domain, workflow_lookup, middleware_chain, config) @task = task @@ -39,7 +51,14 @@ def process executor.run end - complete_task(decisions) + query_results = executor.process_queries(parse_queries) + + if legacy_query_task? + complete_query(query_results[LEGACY_QUERY_KEY]) + else + complete_task(commands, query_results) + end + rescue StandardError => error fail_task(error.inspect) Cadence.logger.debug(error.backtrace.join("\n")) @@ -86,16 +105,44 @@ def fetch_full_history Workflow::History.new(events) end - def complete_task(decisions) + def legacy_query_task? + !!task.query + end + + def parse_queries + # Support for deprecated query style + if legacy_query_task? + { LEGACY_QUERY_KEY => Query.new(task.query) } + else + task.queries.each_with_object({}) do |(query_id, query), result| + result[query_id] = Query.new(query) + end + end + end + + def complete_task(commands, query_results) Cadence.logger.info("Decision task for #{workflow_name} completed") - connection.respond_decision_task_completed( + connection.respond_workflow_task_completed( + namespace: namespace, + task_token: task_token, + commands: commands, + query_results: query_results + ) + end + + def complete_query(result) + Cadence.logger.info("Workflow Query task completed", metadata.to_h) + + connection.respond_query_task_completed( + namespace: namespace, task_token: task_token, - decisions: serialize_decisions(decisions) + query_result: result ) rescue StandardError => error - Cadence.logger.error("Unable to complete Decision task #{workflow_name}: #{error.inspect}") - Cadence::ErrorHandler.handle(error, metadata: metadata) + Cadence.logger.error("Unable to complete a query", metadata.to_h.merge(error: error.inspect)) + + Cadence::ErrorHandler.handle(error, config, metadata: metadata) end def fail_task(message) diff --git a/spec/fabricators/thrift/workflow_query_fabricator.rb b/spec/fabricators/thrift/workflow_query_fabricator.rb new file mode 100644 index 00000000..03e9412c --- /dev/null +++ b/spec/fabricators/thrift/workflow_query_fabricator.rb @@ -0,0 +1,6 @@ +Fabricator(:api_workflow_query, from: CadenceThrift::WorkflowQuery) do + query_type { 'state' } + # might need to change the line below + query_args { Cadence.configuration.converter.to_payloads(['']) } +end + diff --git a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb index b3e702ec..f7e7f0b2 100644 --- a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb +++ b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb @@ -11,7 +11,9 @@ class TestWorkflow < Cadence::Workflow; end subject { described_class.new(task, domain, lookup, middleware_chain, config) } - let(:task) { Fabricate(:decision_task_thrift) } + let(:query) { nil } + let(:queries) { nil } + let(:task) { Fabricate(:api_workflow_task, { workflow_type: api_workflow_type, query: query, queries: queries }.compact) } let(:domain) { 'test-domain' } let(:lookup) { Cadence::ExecutableLookup.new } let(:connection) do From a716bf6749ef67cc226ce8f75b87869d0898f7c4 Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Thu, 28 Jul 2022 10:00:32 -0700 Subject: [PATCH 05/12] synced over changes from serializer, minor changes elsewhere --- lib/cadence/connection/thrift.rb | 22 +++++++++---------- .../workflow/decision_task_processor.rb | 7 +++--- lib/cadence/workflow/serializer.rb | 7 +++++- .../workflow/serializer/query_answer.rb | 17 ++++++++++++++ .../workflow/serializer/query_failure.rb | 16 ++++++++++++++ .../workflow/serializer/query_answer_spec.rb | 21 ++++++++++++++++++ .../workflow/serializer/query_failure_spec.rb | 19 ++++++++++++++++ 7 files changed, 93 insertions(+), 16 deletions(-) create mode 100644 lib/cadence/workflow/serializer/query_answer.rb create mode 100644 lib/cadence/workflow/serializer/query_failure.rb create mode 100644 spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb create mode 100644 spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb diff --git a/lib/cadence/connection/thrift.rb b/lib/cadence/connection/thrift.rb index 8106c4d2..4b1b51f7 100644 --- a/lib/cadence/connection/thrift.rb +++ b/lib/cadence/connection/thrift.rb @@ -148,11 +148,12 @@ def poll_for_decision_task(domain:, task_list:) send_request('PollForDecisionTask', request) end - def respond_decision_task_completed(task_token:, decisions:) + def respond_decision_task_completed(task_token:, decisions:, query_results:) request = CadenceThrift::RespondDecisionTaskCompletedRequest.new( identity: identity, taskToken: task_token, - decisions: Array(decisions) + decisions: Array(decisions), + queryResults: query_results ) send_request('RespondDecisionTaskCompleted', request) end @@ -343,15 +344,14 @@ def get_search_attributes raise NotImplementedError end - def respond_query_task_completed(namespace:, task_token:, query_result:) + def respond_query_task_completed(task_token:, query_result:) query_result_proto = Serializer.serialize(query_result) - request = CadenceThrift::RespondQueryTaskCompletedRequest.new( - task_token: task_token, - namespace: namespace, - completed_type: query_result_proto.result_type, - query_result: query_result_proto.answer, - error_message: query_result_proto.error_message, - ) + request = CadenceThrift::RespondQueryTaskCompletedRequest.new( + taskToken: task_token, + completedType: query_result_proto.result_type, + queryResult: query_result_proto.answer, + errorMessage: query_result_proto.error_message, + ) client.respond_query_task_completed(request) end @@ -459,7 +459,7 @@ def serialize_time_filter(from, to) CadenceThrift::StartTimeFilter.new( earliestTime: Cadence::Utils.time_to_nanos(from).to_i, latestTime: Cadence::Utils.time_to_nanos(to).to_i, - ) + ) end def serialize_execution_filter(value) diff --git a/lib/cadence/workflow/decision_task_processor.rb b/lib/cadence/workflow/decision_task_processor.rb index 10b76444..6b4d37d1 100644 --- a/lib/cadence/workflow/decision_task_processor.rb +++ b/lib/cadence/workflow/decision_task_processor.rb @@ -123,11 +123,10 @@ def parse_queries def complete_task(commands, query_results) Cadence.logger.info("Decision task for #{workflow_name} completed") - connection.respond_workflow_task_completed( - namespace: namespace, + connection.respond_decision_task_completed( task_token: task_token, - commands: commands, - query_results: query_results + # not sure what to put as the decisions here + decisions: query_results ) end diff --git a/lib/cadence/workflow/serializer.rb b/lib/cadence/workflow/serializer.rb index dcc891f9..f9879a6b 100644 --- a/lib/cadence/workflow/serializer.rb +++ b/lib/cadence/workflow/serializer.rb @@ -1,4 +1,5 @@ require 'cadence/workflow/decision' +require 'cadence/workflow/query_result' require 'cadence/workflow/serializer/schedule_activity' require 'cadence/workflow/serializer/start_child_workflow' require 'cadence/workflow/serializer/request_activity_cancellation' @@ -7,6 +8,8 @@ require 'cadence/workflow/serializer/cancel_timer' require 'cadence/workflow/serializer/complete_workflow' require 'cadence/workflow/serializer/fail_workflow' +require 'cadence/workflow/serializer/query_answer' +require 'cadence/workflow/serializer/query_failure' module Cadence class Workflow @@ -19,7 +22,9 @@ module Serializer Workflow::Decision::StartTimer => Serializer::StartTimer, Workflow::Decision::CancelTimer => Serializer::CancelTimer, Workflow::Decision::CompleteWorkflow => Serializer::CompleteWorkflow, - Workflow::Decision::FailWorkflow => Serializer::FailWorkflow + Workflow::Decision::FailWorkflow => Serializer::FailWorkflow, + Workflow::QueryResult::Answer => Serializer::QueryAnswer, + Workflow::QueryResult::Failure => Serializer::QueryFailure, }.freeze def self.serialize(object) diff --git a/lib/cadence/workflow/serializer/query_answer.rb b/lib/cadence/workflow/serializer/query_answer.rb new file mode 100644 index 00000000..8647353e --- /dev/null +++ b/lib/cadence/workflow/serializer/query_answer.rb @@ -0,0 +1,17 @@ +require 'cadence/workflow/serializer/base' + +module Cadence + class Workflow + module Serializer + class QueryAnswer < Base + + def to_proto + CadenceThrift::WorkflowQueryResult.new( + result_type: CadenceThrift::QueryResultType::ANSWERED, + answer: JSON.serialize(object.result) + ) + end + end + end + end +end \ No newline at end of file diff --git a/lib/cadence/workflow/serializer/query_failure.rb b/lib/cadence/workflow/serializer/query_failure.rb new file mode 100644 index 00000000..fdc58b18 --- /dev/null +++ b/lib/cadence/workflow/serializer/query_failure.rb @@ -0,0 +1,16 @@ +require 'cadence/workflow/serializer/base' + +module Cadence + class Workflow + module Serializer + class QueryFailure < Base + def to_proto + CadenceThrift::WorkflowQueryResult.new( + result_type: CadenceThrift::QueryResultType::FAILED, + error_message: object.error.message + ) + end + end + end + end +end diff --git a/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb b/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb new file mode 100644 index 00000000..5da9f2aa --- /dev/null +++ b/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb @@ -0,0 +1,21 @@ +require 'cadence/workflow/serializer/query_failure' +require 'cadence/workflow/query_result' +require 'cadence/workflow/serializer/query_answer' + +describe Cadence::Workflow::Serializer::QueryAnswer do + class TestDeserializer + end + + describe 'to_proto' do + let(:query_result) { Cadence::Workflow::QueryResult.answer(42) } + it 'produces a protobuf' do + result = described_class.new(query_result).to_proto + + expect(result).to be_a(CadenceThrift::WorkflowQueryResult) + expect(result.result_type).to eq(CadenceThrift::QueryResultType.lookup( + CadenceThrift::QueryResultType::ANSWERED) + ) + expect(result.answer).to eq(JSON.serialize(42)) + end + end +end \ No newline at end of file diff --git a/spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb b/spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb new file mode 100644 index 00000000..feb1cd12 --- /dev/null +++ b/spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb @@ -0,0 +1,19 @@ +require 'cadence/workflow/serializer/query_failure' +require 'cadence/workflow/query_result' + +describe Cadence::Workflow::Serializer::QueryFailure do + describe 'to_proto' do + let(:exception) { StandardError.new('Test query failure') } + let(:query_result) { Cadence::Workflow::QueryResult.failure(exception) } + + it 'produces a protobuf' do + result = described_class.new(query_result).to_proto + + expect(result).to be_a(CadenceThrift::WorkflowQueryResult) + expect(result.result_type).to eq(CadenceThrift::QueryResultType.lookup( + CadenceThrift::QueryResultType::FAILED) + ) + expect(result.error_message).to eq('Test query failure') + end + end +end \ No newline at end of file From 231ec2a7cf099339b946814be836c5d84f91fff0 Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Mon, 1 Aug 2022 13:06:39 -0700 Subject: [PATCH 06/12] implemented name change to_thrift among other minor changes --- lib/cadence/connection/thrift.rb | 12 ++++++------ lib/cadence/workflow/decision_task_processor.rb | 7 +++---- lib/cadence/workflow/serializer/query_answer.rb | 4 ++-- lib/cadence/workflow/serializer/query_failure.rb | 6 +++--- .../workflow/serializer/query_answer_spec.rb | 13 ++++++------- .../workflow/serializer/query_failure_spec.rb | 13 ++++++------- 6 files changed, 26 insertions(+), 29 deletions(-) diff --git a/lib/cadence/connection/thrift.rb b/lib/cadence/connection/thrift.rb index 4b1b51f7..6d5e4a52 100644 --- a/lib/cadence/connection/thrift.rb +++ b/lib/cadence/connection/thrift.rb @@ -148,12 +148,12 @@ def poll_for_decision_task(domain:, task_list:) send_request('PollForDecisionTask', request) end - def respond_decision_task_completed(task_token:, decisions:, query_results:) + def respond_decision_task_completed(task_token:, decisions:, query_results: {}) request = CadenceThrift::RespondDecisionTaskCompletedRequest.new( identity: identity, taskToken: task_token, decisions: Array(decisions), - queryResults: query_results + queryResults: query_results.transform_values { |value| Serializer.serialize(value) } ) send_request('RespondDecisionTaskCompleted', request) end @@ -345,12 +345,12 @@ def get_search_attributes end def respond_query_task_completed(task_token:, query_result:) - query_result_proto = Serializer.serialize(query_result) + query_result_thrift = Serializer.serialize(query_result) request = CadenceThrift::RespondQueryTaskCompletedRequest.new( taskToken: task_token, - completedType: query_result_proto.result_type, - queryResult: query_result_proto.answer, - errorMessage: query_result_proto.error_message, + completedType: query_result_thrift.result_type, + queryResult: query_result_thrift.answer, + errorMessage: query_result_thrift.error_message, ) client.respond_query_task_completed(request) diff --git a/lib/cadence/workflow/decision_task_processor.rb b/lib/cadence/workflow/decision_task_processor.rb index 6b4d37d1..0abf977c 100644 --- a/lib/cadence/workflow/decision_task_processor.rb +++ b/lib/cadence/workflow/decision_task_processor.rb @@ -72,7 +72,7 @@ def process private attr_reader :task, :domain, :task_token, :workflow_name, :workflow_class, - :middleware_chain, :config, :metadata + :middleware_chain, :config, :metadata def connection @connection ||= Cadence::Connection.generate(config.for_connection) @@ -125,8 +125,8 @@ def complete_task(commands, query_results) connection.respond_decision_task_completed( task_token: task_token, - # not sure what to put as the decisions here - decisions: query_results + decisions: serialize_decisions(decisions), + query_results: query_results ) end @@ -134,7 +134,6 @@ def complete_query(result) Cadence.logger.info("Workflow Query task completed", metadata.to_h) connection.respond_query_task_completed( - namespace: namespace, task_token: task_token, query_result: result ) diff --git a/lib/cadence/workflow/serializer/query_answer.rb b/lib/cadence/workflow/serializer/query_answer.rb index 8647353e..a835b264 100644 --- a/lib/cadence/workflow/serializer/query_answer.rb +++ b/lib/cadence/workflow/serializer/query_answer.rb @@ -5,9 +5,9 @@ class Workflow module Serializer class QueryAnswer < Base - def to_proto + def to_thrift CadenceThrift::WorkflowQueryResult.new( - result_type: CadenceThrift::QueryResultType::ANSWERED, + resultType: CadenceThrift::QueryResultType::ANSWERED, answer: JSON.serialize(object.result) ) end diff --git a/lib/cadence/workflow/serializer/query_failure.rb b/lib/cadence/workflow/serializer/query_failure.rb index fdc58b18..0a256ec5 100644 --- a/lib/cadence/workflow/serializer/query_failure.rb +++ b/lib/cadence/workflow/serializer/query_failure.rb @@ -4,10 +4,10 @@ module Cadence class Workflow module Serializer class QueryFailure < Base - def to_proto + def to_thrift CadenceThrift::WorkflowQueryResult.new( - result_type: CadenceThrift::QueryResultType::FAILED, - error_message: object.error.message + resultType: CadenceThrift::QueryResultType::FAILED, + errorReason: object.error.message ) end end diff --git a/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb b/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb index 5da9f2aa..6c5a1951 100644 --- a/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb +++ b/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb @@ -6,16 +6,15 @@ class TestDeserializer end - describe 'to_proto' do + describe 'to_thrift' do let(:query_result) { Cadence::Workflow::QueryResult.answer(42) } - it 'produces a protobuf' do - result = described_class.new(query_result).to_proto + it 'produces a thrift object' do + result = described_class.new(query_result).to_thrift expect(result).to be_a(CadenceThrift::WorkflowQueryResult) - expect(result.result_type).to eq(CadenceThrift::QueryResultType.lookup( - CadenceThrift::QueryResultType::ANSWERED) - ) - expect(result.answer).to eq(JSON.serialize(42)) + expect(result.resultType).to eq(CadenceThrift::QueryResultType::ANSWERED + ) + expect(result.answer).to eq("42") end end end \ No newline at end of file diff --git a/spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb b/spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb index feb1cd12..570cb013 100644 --- a/spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb +++ b/spec/unit/lib/cadence/workflow/serializer/query_failure_spec.rb @@ -2,18 +2,17 @@ require 'cadence/workflow/query_result' describe Cadence::Workflow::Serializer::QueryFailure do - describe 'to_proto' do + describe 'to_thrift' do let(:exception) { StandardError.new('Test query failure') } let(:query_result) { Cadence::Workflow::QueryResult.failure(exception) } - it 'produces a protobuf' do - result = described_class.new(query_result).to_proto + it 'produces a thrift object' do + result = described_class.new(query_result).to_thrift expect(result).to be_a(CadenceThrift::WorkflowQueryResult) - expect(result.result_type).to eq(CadenceThrift::QueryResultType.lookup( - CadenceThrift::QueryResultType::FAILED) - ) - expect(result.error_message).to eq('Test query failure') + expect(result.resultType).to eq(CadenceThrift::QueryResultType::FAILED + ) + expect(result.errorReason).to eq('Test query failure') end end end \ No newline at end of file From 730f42af82156882cdf97db00ca20f1321d6eb02 Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Wed, 10 Aug 2022 13:05:27 -0700 Subject: [PATCH 07/12] added more tests --- .../thrift/workflow_query_fabricator.rb | 2 +- .../unit/lib/cadence/workflow/context_spec.rb | 95 ++++++++++++------- .../lib/cadence/workflow/executor_spec.rb | 77 +++++++++++---- .../cadence/workflow/query_registry_spec.rb | 72 ++++++++++---- .../workflow/serializer/query_answer_spec.rb | 2 +- 5 files changed, 170 insertions(+), 78 deletions(-) diff --git a/spec/fabricators/thrift/workflow_query_fabricator.rb b/spec/fabricators/thrift/workflow_query_fabricator.rb index 03e9412c..3be62667 100644 --- a/spec/fabricators/thrift/workflow_query_fabricator.rb +++ b/spec/fabricators/thrift/workflow_query_fabricator.rb @@ -1,6 +1,6 @@ Fabricator(:api_workflow_query, from: CadenceThrift::WorkflowQuery) do query_type { 'state' } # might need to change the line below - query_args { Cadence.configuration.converter.to_payloads(['']) } + query_args { Cadence::JSON.serialize(['']) } end diff --git a/spec/unit/lib/cadence/workflow/context_spec.rb b/spec/unit/lib/cadence/workflow/context_spec.rb index ef17f7c9..e96ef4fd 100644 --- a/spec/unit/lib/cadence/workflow/context_spec.rb +++ b/spec/unit/lib/cadence/workflow/context_spec.rb @@ -4,45 +4,68 @@ require 'cadence/configuration' describe Cadence::Workflow::Context do - subject { described_class.new(state_manager, dispatcher, metadata, config, query_registry) } - - let(:state_manager) { instance_double('Cadence::Workflow::StateManager') } - let(:dispatcher) { Cadence::Workflow::Dispatcher.new } - let(:metadata_hash) do - { - domain: 'test-domain', - id: SecureRandom.uuid, - name: 'TestWorkflow', - run_id: SecureRandom.uuid, - attempt: 0, - timeouts: { execution: 15, task: 10 }, - headers: { 'TestHeader' => 'Value' } - } - end - let(:metadata) { Cadence::Metadata::Workflow.new(metadata_hash) } - let(:config) { Cadence::Configuration.new } - let(:query_registry) { instance_double('Cadence::Workflow::QueryRegistry') } + subject { described_class.new(state_manager, dispatcher, metadata, config, query_registry) } + + let(:state_manager) { instance_double('Cadence::Workflow::StateManager') } + let(:dispatcher) { Cadence::Workflow::Dispatcher.new } + let(:metadata_hash) do + { + domain: 'test-domain', + id: SecureRandom.uuid, + name: 'TestWorkflow', + run_id: SecureRandom.uuid, + attempt: 0, + timeouts: { execution: 15, task: 10 }, + headers: { 'TestHeader' => 'Value' } + } + end + let(:metadata) { Cadence::Workflow.new(metadata_hash) } + let(:config) { Cadence::Configuration.new } + let(:query_registry) { instance_double('Cadence::Workflow::QueryRegistry') } + let(:workflow_context) do + Cadence::Workflow::Context.new( + state_manager, + dispatcher, + query_registry, + metadata, + Cadence.configuration + ) + end + + describe '#on_query' do + let(:handler) { Proc.new {} } + + before { allow(query_registry).to receive(:register) } + + it 'registers a query with the query registry' do + workflow_context.on_query('test-query', &handler) - describe '#headers' do - it 'returns metadata headers' do - expect(subject.headers).to eq('TestHeader' => 'Value') + expect(query_registry).to have_received(:register).with('test-query') do |&block| + expect(block).to eq(handler) end end + end + + describe '#headers' do + it 'returns metadata headers' do + expect(subject.headers).to eq('TestHeader' => 'Value') + end + end + + describe '.sleep_until' do + let(:start_time) { Time.now } + let(:end_time) { Time.now + 1 } + let(:delay_time) { (end_time - start_time).to_i } + + before do + allow(state_manager).to receive(:local_time).and_return(start_time) + allow(subject).to receive(:sleep) + end - describe '.sleep_until' do - let(:start_time) { Time.now} - let(:end_time) { Time.now + 1} - let(:delay_time) { (end_time-start_time).to_i } - - before do - allow(state_manager).to receive(:local_time).and_return(start_time) - allow(subject).to receive(:sleep) - end - - it 'sleeps until the given end_time' do - subject.sleep_until(end_time) - # Since sleep_until uses, sleep, just make sure that sleep is called with the delay_time - expect(subject).to have_received(:sleep).with(delay_time) - end + it 'sleeps until the given end_time' do + subject.sleep_until(end_time) + # Since sleep_until uses, sleep, just make sure that sleep is called with the delay_time + expect(subject).to have_received(:sleep).with(delay_time) end + end end diff --git a/spec/unit/lib/cadence/workflow/executor_spec.rb b/spec/unit/lib/cadence/workflow/executor_spec.rb index e4e6cd6d..7f4e7552 100644 --- a/spec/unit/lib/cadence/workflow/executor_spec.rb +++ b/spec/unit/lib/cadence/workflow/executor_spec.rb @@ -1,6 +1,8 @@ require 'cadence/workflow/executor' require 'cadence/workflow/history' require 'cadence/workflow' +require 'cadence/workflow/decision_task_processor' +require 'cadence/workflow/query_registry' describe Cadence::Workflow::Executor do subject { described_class.new(workflow, history, decision_metadata, config) } @@ -8,11 +10,11 @@ let(:workflow_started_event) { Fabricate(:workflow_execution_started_event_thrift, eventId: 1) } let(:history) do Cadence::Workflow::History.new([ - workflow_started_event, - Fabricate(:decision_task_scheduled_event_thrift, eventId: 2), - Fabricate(:decision_task_started_event_thrift, eventId: 3), - Fabricate(:decision_task_completed_event_thrift, eventId: 4) - ]) + workflow_started_event, + Fabricate(:decision_task_scheduled_event_thrift, eventId: 2), + Fabricate(:decision_task_started_event_thrift, eventId: 3), + Fabricate(:decision_task_completed_event_thrift, eventId: 4) + ]) end let(:workflow) { TestWorkflow } let(:decision_metadata) { Fabricate(:decision_metadata) } @@ -32,10 +34,10 @@ def execute expect(workflow) .to have_received(:execute_in_context) - .with( - an_instance_of(Cadence::Workflow::Context), - nil - ) + .with( + an_instance_of(Cadence::Workflow::Context), + nil + ) end it 'returns a complete workflow decision' do @@ -59,20 +61,53 @@ def execute event_attributes = workflow_started_event.workflowExecutionStartedEventAttributes expect(Cadence::Metadata::Workflow) .to have_received(:new) - .with( - domain: decision_metadata.domain, - id: decision_metadata.workflow_id, - name: event_attributes.workflowType.name, - run_id: event_attributes.originalExecutionRunId, - attempt: event_attributes.attempt, - headers: { 'Foo' => 'Bar' }, - timeouts: { - execution: event_attributes.executionStartToCloseTimeoutSeconds, - task: event_attributes.taskStartToCloseTimeoutSeconds - } - ) + .with( + domain: decision_metadata.domain, + id: decision_metadata.workflow_id, + name: event_attributes.workflowType.name, + run_id: event_attributes.originalExecutionRunId, + attempt: event_attributes.attempt, + headers: { 'Foo' => 'Bar' }, + timeouts: { + execution: event_attributes.executionStartToCloseTimeoutSeconds, + task: event_attributes.taskStartToCloseTimeoutSeconds + } + ) + end + end + + describe '#process_queries' do + let(:query_registry) { Cadence::Workflow::QueryRegistry.new } + let(:query_1_result) { 42 } + let(:query_2_error) { StandardError.new('Test query failure') } + let(:queries) do + { + '1' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'success')), + '2' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'failure')), + '3' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'unknown')), + } + end + + before do + allow(Cadence::Workflow::QueryRegistry).to receive(:new).and_return(query_registry) + query_registry.register('success') { query_1_result } + query_registry.register('failure') { raise query_2_error } + end + + it 'returns query results' do + results = subject.process_queries(queries) + + expect(results.length).to eq(3) + expect(results['1']).to be_a(Cadence::Workflow::QueryResult::Answer) + expect(results['1'].result).to eq(query_1_result) + expect(results['2']).to be_a(Cadence::Workflow::QueryResult::Failure) + expect(results['2'].error).to eq(query_2_error) + expect(results['3']).to be_a(Cadence::Workflow::QueryResult::Failure) + expect(results['3'].error).to be_a(Cadence::QueryFailed) + expect(results['3'].error.message).to eq('Workflow did not register a handler for unknown') end end end + diff --git a/spec/unit/lib/cadence/workflow/query_registry_spec.rb b/spec/unit/lib/cadence/workflow/query_registry_spec.rb index 07950642..7dfdb72d 100644 --- a/spec/unit/lib/cadence/workflow/query_registry_spec.rb +++ b/spec/unit/lib/cadence/workflow/query_registry_spec.rb @@ -1,33 +1,67 @@ -require 'cadence/errors' +require 'cadence/workflow/query_registry' -module Cadence - class Workflow - class QueryRegistry - def initialize - @handlers = {} +describe Cadence::Workflow::QueryRegistry do + subject { described_class.new } + + describe '#register' do + let(:handler) { Proc.new {} } + + it 'registers a query handler' do + subject.register('test-query', &handler) + + expect(subject.send(:handlers)['test-query']).to eq(handler) + end + + context 'when query handler is already registered' do + let(:handler_2) { Proc.new {} } + + before { subject.register('test-query', &handler) } + + it 'warns' do + allow(subject).to receive(:warn) + + subject.register('test-query', &handler_2) + + expect(subject) + .to have_received(:warn) + .with('[NOTICE] Overwriting a query handler for test-query') end - def register(type, &handler) - if handlers.key?(type) - warn "[NOTICE] Overwriting a query handler for #{type}" - end + it 're-registers a query handler' do + subject.register('test-query', &handler_2) - handlers[type] = handler + expect(subject.send(:handlers)['test-query']).to eq(handler_2) end + end + end - def handle(type, args = nil) - handler = handlers[type] + describe '#handle' do + context 'when a query handler has been registered' do + let(:handler) { Proc.new { 42 } } - unless handler - raise Cadence::QueryFailed, "Workflow did not register a handler for #{type}" - end + before { subject.register('test-query', &handler) } - handler.call(*args) + it 'runs the handler and returns the result' do + expect(subject.handle('test-query')).to eq(42) end + end + + context 'when a query handler has been registered with args' do + let(:handler) { Proc.new { |arg_1, arg_2| arg_1 + arg_2 } } - private + before { subject.register('test-query', &handler) } - attr_reader :handlers + it 'runs the handler and returns the result' do + expect(subject.handle('test-query', [3, 5])).to eq(8) + end + end + + context 'when a query handler has not been registered' do + it 'raises' do + expect do + subject.handle('test-query') + end.to raise_error(Cadence::QueryFailed, 'Workflow did not register a handler for test-query') + end end end end \ No newline at end of file diff --git a/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb b/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb index 6c5a1951..4611937b 100644 --- a/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb +++ b/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb @@ -14,7 +14,7 @@ class TestDeserializer expect(result).to be_a(CadenceThrift::WorkflowQueryResult) expect(result.resultType).to eq(CadenceThrift::QueryResultType::ANSWERED ) - expect(result.answer).to eq("42") + expect(result.answer).to eq(Cadence::JSON.serialize("42")) end end end \ No newline at end of file From cd6ae7a71412bc1f4ee83e7127e722a3622775c1 Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Wed, 17 Aug 2022 10:55:30 -0700 Subject: [PATCH 08/12] more testing progress --- examples/spec/integration/query_workflow_spec.rb | 2 +- lib/cadence/client.rb | 3 ++- lib/cadence/connection/thrift.rb | 12 +++++++----- lib/cadence/workflow/executor.rb | 2 +- spec/fabricators/thrift/workflow_query_fabricator.rb | 4 ++-- spec/unit/lib/cadence/workflow/context_spec.rb | 8 +++++--- .../cadence/workflow/decision_task_processor_spec.rb | 4 +++- spec/unit/lib/cadence/workflow/executor_spec.rb | 6 +++--- 8 files changed, 24 insertions(+), 17 deletions(-) diff --git a/examples/spec/integration/query_workflow_spec.rb b/examples/spec/integration/query_workflow_spec.rb index 89c10e2c..385b856f 100644 --- a/examples/spec/integration/query_workflow_spec.rb +++ b/examples/spec/integration/query_workflow_spec.rb @@ -1,4 +1,4 @@ -require 'examples/workflows/query_workflow' +require 'workflows/query_workflow' require 'cadence/errors' diff --git a/lib/cadence/client.rb b/lib/cadence/client.rb index 0979e7a1..7ce12168 100644 --- a/lib/cadence/client.rb +++ b/lib/cadence/client.rb @@ -76,8 +76,9 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil) end def query_workflow(workflow, query, workflow_id, run_id, *args, domain: nil, query_reject_condition: nil) + execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) connection.query_workflow( - domain: domain || workflow.domain, + domain: domain || execution_options.domain, workflow_id: workflow_id, run_id: run_id, query: query, diff --git a/lib/cadence/connection/thrift.rb b/lib/cadence/connection/thrift.rb index 6d5e4a52..87004286 100644 --- a/lib/cadence/connection/thrift.rb +++ b/lib/cadence/connection/thrift.rb @@ -364,12 +364,12 @@ def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reje request = CadenceThrift::QueryWorkflowRequest.new( domain: domain, execution: CadenceThrift::WorkflowExecution.new( - workflow_id: workflow_id, - run_id: run_id + workflowId: workflow_id, + runId: run_id ), query: CadenceThrift::WorkflowQuery.new( - query_type: query, - query_args: JSON.serialize(args) + queryType: query, + queryArgs: JSON.serialize(args) ) ) if query_reject_condition @@ -381,7 +381,9 @@ def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reje begin response = client.query_workflow(request) - rescue Cadence::InvalidArgument => e + # rescue InvalidArgument => e doesn't seem to work + # + rescue InvalidArgumentError => e raise Cadence::QueryFailed, e.details end diff --git a/lib/cadence/workflow/executor.rb b/lib/cadence/workflow/executor.rb index e628bbb0..65136234 100644 --- a/lib/cadence/workflow/executor.rb +++ b/lib/cadence/workflow/executor.rb @@ -51,7 +51,7 @@ def process_queries(queries = {}) attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config, :query_registry def process_query(query) - result = query_registry.handle(query.query_type, query.query_args) + result = query_registry.handle(query.queryType, query.queryArgs) QueryResult.answer(result) rescue StandardError => error diff --git a/spec/fabricators/thrift/workflow_query_fabricator.rb b/spec/fabricators/thrift/workflow_query_fabricator.rb index 3be62667..b198ce5b 100644 --- a/spec/fabricators/thrift/workflow_query_fabricator.rb +++ b/spec/fabricators/thrift/workflow_query_fabricator.rb @@ -1,6 +1,6 @@ Fabricator(:api_workflow_query, from: CadenceThrift::WorkflowQuery) do - query_type { 'state' } + queryType { 'state' } # might need to change the line below - query_args { Cadence::JSON.serialize(['']) } + queryArgs { Cadence::JSON.serialize(['']) } end diff --git a/spec/unit/lib/cadence/workflow/context_spec.rb b/spec/unit/lib/cadence/workflow/context_spec.rb index e96ef4fd..a66f77d7 100644 --- a/spec/unit/lib/cadence/workflow/context_spec.rb +++ b/spec/unit/lib/cadence/workflow/context_spec.rb @@ -3,6 +3,8 @@ require 'cadence/workflow/dispatcher' require 'cadence/configuration' +class MyTestWorkflow < Cadence::Workflow; end + describe Cadence::Workflow::Context do subject { described_class.new(state_manager, dispatcher, metadata, config, query_registry) } @@ -26,9 +28,9 @@ Cadence::Workflow::Context.new( state_manager, dispatcher, - query_registry, metadata, - Cadence.configuration + Cadence.configuration, + query_registry, ) end @@ -48,7 +50,7 @@ describe '#headers' do it 'returns metadata headers' do - expect(subject.headers).to eq('TestHeader' => 'Value') + expect(workflow_context.headers).to eq('TestHeader' => 'Value') end end diff --git a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb index f7e7f0b2..1ee8100c 100644 --- a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb +++ b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb @@ -13,7 +13,9 @@ class TestWorkflow < Cadence::Workflow; end let(:query) { nil } let(:queries) { nil } - let(:task) { Fabricate(:api_workflow_task, { workflow_type: api_workflow_type, query: query, queries: queries }.compact) } + # let(:task) { Fabricate(:api_workflow_task, { workflow_type: api_workflow_type, query: query, queries: queries }.compact) } + let(:task) { Fabricate(:decision_task_thrift) } + # let(:api_workflow_type) { Fabricate(:api_workflow_type, name: workflow_name) } let(:domain) { 'test-domain' } let(:lookup) { Cadence::ExecutableLookup.new } let(:connection) do diff --git a/spec/unit/lib/cadence/workflow/executor_spec.rb b/spec/unit/lib/cadence/workflow/executor_spec.rb index 7f4e7552..592e6b95 100644 --- a/spec/unit/lib/cadence/workflow/executor_spec.rb +++ b/spec/unit/lib/cadence/workflow/executor_spec.rb @@ -82,9 +82,9 @@ def execute let(:query_2_error) { StandardError.new('Test query failure') } let(:queries) do { - '1' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'success')), - '2' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'failure')), - '3' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, query_type: 'unknown')), + '1' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, queryType: 'success')), + '2' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, queryType: 'failure')), + '3' => Cadence::Workflow::DecisionTaskProcessor::Query.new(Fabricate(:api_workflow_query, queryType: 'unknown')), } end From e4a17d95bc6d26e244670b6908a10ac69ead633d Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Thu, 18 Aug 2022 10:08:19 -0700 Subject: [PATCH 09/12] initial changes to tests --- lib/cadence/workflow/decision_task_processor.rb | 4 ++-- lib/cadence/workflow/executor.rb | 4 ++-- lib/cadence/workflow/query_registry.rb | 1 + lib/cadence/workflow/serializer/query_answer.rb | 1 - spec/unit/lib/cadence/workflow/context_spec.rb | 3 ++- spec/unit/lib/cadence/workflow/executor_spec.rb | 1 - .../lib/cadence/workflow/serializer/query_answer_spec.rb | 7 ++++++- 7 files changed, 13 insertions(+), 8 deletions(-) diff --git a/lib/cadence/workflow/decision_task_processor.rb b/lib/cadence/workflow/decision_task_processor.rb index 0abf977c..40279911 100644 --- a/lib/cadence/workflow/decision_task_processor.rb +++ b/lib/cadence/workflow/decision_task_processor.rb @@ -10,11 +10,11 @@ class DecisionTaskProcessor Query = Struct.new(:query) do def query_type - query.query_type + query.queryType end def query_args - JSON.deserialize(query.query_args) + JSON.deserialize(query.queryArgs) end end diff --git a/lib/cadence/workflow/executor.rb b/lib/cadence/workflow/executor.rb index 65136234..de7013ab 100644 --- a/lib/cadence/workflow/executor.rb +++ b/lib/cadence/workflow/executor.rb @@ -43,6 +43,7 @@ def run # # @return [Hash] def process_queries(queries = {}) + puts("I made it to process queries") queries.transform_values(&method(:process_query)) end @@ -51,8 +52,7 @@ def process_queries(queries = {}) attr_reader :workflow_class, :dispatcher, :state_manager, :metadata, :history, :config, :query_registry def process_query(query) - result = query_registry.handle(query.queryType, query.queryArgs) - + result = query_registry.handle(query.query_type, query.query_args) QueryResult.answer(result) rescue StandardError => error QueryResult.failure(error) diff --git a/lib/cadence/workflow/query_registry.rb b/lib/cadence/workflow/query_registry.rb index e9c655fb..f3d472e5 100644 --- a/lib/cadence/workflow/query_registry.rb +++ b/lib/cadence/workflow/query_registry.rb @@ -17,6 +17,7 @@ def register(type, &handler) def handle(type, args = nil) handler = handlers[type] + puts(handler) unless handler raise Cadence::QueryFailed, "Workflow did not register a handler for #{type}" diff --git a/lib/cadence/workflow/serializer/query_answer.rb b/lib/cadence/workflow/serializer/query_answer.rb index a835b264..62fc63f1 100644 --- a/lib/cadence/workflow/serializer/query_answer.rb +++ b/lib/cadence/workflow/serializer/query_answer.rb @@ -4,7 +4,6 @@ module Cadence class Workflow module Serializer class QueryAnswer < Base - def to_thrift CadenceThrift::WorkflowQueryResult.new( resultType: CadenceThrift::QueryResultType::ANSWERED, diff --git a/spec/unit/lib/cadence/workflow/context_spec.rb b/spec/unit/lib/cadence/workflow/context_spec.rb index a66f77d7..f461054b 100644 --- a/spec/unit/lib/cadence/workflow/context_spec.rb +++ b/spec/unit/lib/cadence/workflow/context_spec.rb @@ -2,6 +2,7 @@ require 'cadence/workflow/context' require 'cadence/workflow/dispatcher' require 'cadence/configuration' +require 'cadence/metadata/workflow' class MyTestWorkflow < Cadence::Workflow; end @@ -21,7 +22,7 @@ class MyTestWorkflow < Cadence::Workflow; end headers: { 'TestHeader' => 'Value' } } end - let(:metadata) { Cadence::Workflow.new(metadata_hash) } + let(:metadata) { Cadence::Metadata::Workflow.new(metadata_hash) } let(:config) { Cadence::Configuration.new } let(:query_registry) { instance_double('Cadence::Workflow::QueryRegistry') } let(:workflow_context) do diff --git a/spec/unit/lib/cadence/workflow/executor_spec.rb b/spec/unit/lib/cadence/workflow/executor_spec.rb index 592e6b95..72943bb0 100644 --- a/spec/unit/lib/cadence/workflow/executor_spec.rb +++ b/spec/unit/lib/cadence/workflow/executor_spec.rb @@ -96,7 +96,6 @@ def execute it 'returns query results' do results = subject.process_queries(queries) - expect(results.length).to eq(3) expect(results['1']).to be_a(Cadence::Workflow::QueryResult::Answer) expect(results['1'].result).to eq(query_1_result) diff --git a/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb b/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb index 4611937b..5c57b83f 100644 --- a/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb +++ b/spec/unit/lib/cadence/workflow/serializer/query_answer_spec.rb @@ -8,13 +8,18 @@ class TestDeserializer describe 'to_thrift' do let(:query_result) { Cadence::Workflow::QueryResult.answer(42) } + let(:query_workflow_result) do + CadenceThrift::WorkflowQueryResult.new( + answer: 42 + ) + end it 'produces a thrift object' do result = described_class.new(query_result).to_thrift expect(result).to be_a(CadenceThrift::WorkflowQueryResult) expect(result.resultType).to eq(CadenceThrift::QueryResultType::ANSWERED ) - expect(result.answer).to eq(Cadence::JSON.serialize("42")) + expect(result.answer).to eq(Cadence::JSON.serialize(query_workflow_result.answer)) end end end \ No newline at end of file From 7d7162c802d4b89fd52ee323150f85894418ceba Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Fri, 19 Aug 2022 10:56:48 -0700 Subject: [PATCH 10/12] most up to date tests --- lib/cadence/connection/thrift.rb | 3 ++- lib/cadence/metadata.rb | 2 ++ .../thrift/decision_task_fabricator.rb | 1 + .../lib/cadence/connection/thrift_spec.rb | 2 +- .../workflow/decision_task_processor_spec.rb | 21 +++++++++++++++---- 5 files changed, 23 insertions(+), 6 deletions(-) diff --git a/lib/cadence/connection/thrift.rb b/lib/cadence/connection/thrift.rb index 87004286..ff83e1fd 100644 --- a/lib/cadence/connection/thrift.rb +++ b/lib/cadence/connection/thrift.rb @@ -381,9 +381,10 @@ def query_workflow(domain:, workflow_id:, run_id:, query:, args: nil, query_reje begin response = client.query_workflow(request) + puts(response) # rescue InvalidArgument => e doesn't seem to work # - rescue InvalidArgumentError => e + rescue Error => e raise Cadence::QueryFailed, e.details end diff --git a/lib/cadence/metadata.rb b/lib/cadence/metadata.rb index 03009794..8d54c6e3 100644 --- a/lib/cadence/metadata.rb +++ b/lib/cadence/metadata.rb @@ -42,6 +42,7 @@ def activity_metadata_from(task, domain) end def decision_metadata_from(task, domain) + puts(task.workflowType.class) Metadata::Decision.new( domain: domain, id: task.startedEventId, @@ -49,6 +50,7 @@ def decision_metadata_from(task, domain) attempt: task.attempt, workflow_run_id: task.workflowExecution.runId, workflow_id: task.workflowExecution.workflowId, + # task.workflowType.name workflow_name: task.workflowType.name ) end diff --git a/spec/fabricators/thrift/decision_task_fabricator.rb b/spec/fabricators/thrift/decision_task_fabricator.rb index c100bbc1..eb30fff6 100644 --- a/spec/fabricators/thrift/decision_task_fabricator.rb +++ b/spec/fabricators/thrift/decision_task_fabricator.rb @@ -14,4 +14,5 @@ history { Fabricate(:history_thrift) } nextPageToken nil attempt 1 + query { nil } end diff --git a/spec/unit/lib/cadence/connection/thrift_spec.rb b/spec/unit/lib/cadence/connection/thrift_spec.rb index 4006c6a3..b3b6194e 100644 --- a/spec/unit/lib/cadence/connection/thrift_spec.rb +++ b/spec/unit/lib/cadence/connection/thrift_spec.rb @@ -223,4 +223,4 @@ end end end -end +end \ No newline at end of file diff --git a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb index 1ee8100c..d68fb069 100644 --- a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb +++ b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb @@ -13,9 +13,10 @@ class TestWorkflow < Cadence::Workflow; end let(:query) { nil } let(:queries) { nil } - # let(:task) { Fabricate(:api_workflow_task, { workflow_type: api_workflow_type, query: query, queries: queries }.compact) } - let(:task) { Fabricate(:decision_task_thrift) } - # let(:api_workflow_type) { Fabricate(:api_workflow_type, name: workflow_name) } + let(:task) { Fabricate(:decision_task_thrift, { workflowType: :workflow_type_thrift, query: query, queries: queries }.compact) } + # let(:task) { Fabricate(:decision_task_thrift) } + let(:workflow_type_thrift) { Fabricate(:workflow_type_thrift, name: workflow_name) } + let(:workflow_name) { 'TestWorkflow' } let(:domain) { 'test-domain' } let(:lookup) { Cadence::ExecutableLookup.new } let(:connection) do @@ -40,7 +41,7 @@ class TestWorkflow < Cadence::Workflow; end allow(Cadence.logger).to receive(:error) allow(Cadence.logger).to receive(:debug) allow(Cadence::ErrorHandler).to receive(:handle) - + allow(connection).to receive(:respond_query_task_completed) allow(Cadence::Metadata) .to receive(:generate) .with(Cadence::Metadata::DECISION_TYPE, task, domain) @@ -55,6 +56,7 @@ class TestWorkflow < Cadence::Workflow; end .to receive(:new) .with(TestWorkflow, an_instance_of(Cadence::Workflow::History), metadata, config) .and_return(executor) + allow(executor).to receive(:process_queries) end it 'runs the workflow executor' do @@ -205,4 +207,15 @@ class TestWorkflow < Cadence::Workflow; end expect(connection).not_to have_received(:respond_decision_task_failed) end end + + context 'when workflow task queries are included' do + let(:query_id) { SecureRandom.uuid } + let(:query_result) { Cadence::Workflow::QueryResult.answer(42) } + + let(:queries) do + end + CadenceThrift::Map.new(:string, :message, CadenceThrift::WorkflowQuery).tap do |map| + map[query_id] = Fabricate(:api_workflow_query) + end + end end From e60e9b58fa48dfe3722e3ffe7a3504299877c0d8 Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Fri, 19 Aug 2022 16:49:07 -0700 Subject: [PATCH 11/12] suggested changes --- lib/cadence/metadata.rb | 2 +- lib/cadence/workflow/decision_task_processor.rb | 5 +++-- lib/cadence/workflow/executor.rb | 1 - .../workflow/decision_task_processor_spec.rb | 14 +------------- 4 files changed, 5 insertions(+), 17 deletions(-) diff --git a/lib/cadence/metadata.rb b/lib/cadence/metadata.rb index 8d54c6e3..be28b60b 100644 --- a/lib/cadence/metadata.rb +++ b/lib/cadence/metadata.rb @@ -42,7 +42,7 @@ def activity_metadata_from(task, domain) end def decision_metadata_from(task, domain) - puts(task.workflowType.class) + puts(task.workflowType.instance_variables) Metadata::Decision.new( domain: domain, id: task.startedEventId, diff --git a/lib/cadence/workflow/decision_task_processor.rb b/lib/cadence/workflow/decision_task_processor.rb index 40279911..8b5b9942 100644 --- a/lib/cadence/workflow/decision_task_processor.rb +++ b/lib/cadence/workflow/decision_task_processor.rb @@ -56,7 +56,7 @@ def process if legacy_query_task? complete_query(query_results[LEGACY_QUERY_KEY]) else - complete_task(commands, query_results) + complete_task(decisions, query_results) end rescue StandardError => error @@ -114,13 +114,14 @@ def parse_queries if legacy_query_task? { LEGACY_QUERY_KEY => Query.new(task.query) } else + return {} if task.queries.nil? task.queries.each_with_object({}) do |(query_id, query), result| result[query_id] = Query.new(query) end end end - def complete_task(commands, query_results) + def complete_task(decisions, query_results) Cadence.logger.info("Decision task for #{workflow_name} completed") connection.respond_decision_task_completed( diff --git a/lib/cadence/workflow/executor.rb b/lib/cadence/workflow/executor.rb index de7013ab..ff09b8b0 100644 --- a/lib/cadence/workflow/executor.rb +++ b/lib/cadence/workflow/executor.rb @@ -43,7 +43,6 @@ def run # # @return [Hash] def process_queries(queries = {}) - puts("I made it to process queries") queries.transform_values(&method(:process_query)) end diff --git a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb index d68fb069..a44e6048 100644 --- a/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb +++ b/spec/unit/lib/cadence/workflow/decision_task_processor_spec.rb @@ -13,8 +13,7 @@ class TestWorkflow < Cadence::Workflow; end let(:query) { nil } let(:queries) { nil } - let(:task) { Fabricate(:decision_task_thrift, { workflowType: :workflow_type_thrift, query: query, queries: queries }.compact) } - # let(:task) { Fabricate(:decision_task_thrift) } + let(:task) { Fabricate(:decision_task_thrift, { workflowType: workflow_type_thrift, query: query, queries: queries }.compact) } let(:workflow_type_thrift) { Fabricate(:workflow_type_thrift, name: workflow_name) } let(:workflow_name) { 'TestWorkflow' } let(:domain) { 'test-domain' } @@ -207,15 +206,4 @@ class TestWorkflow < Cadence::Workflow; end expect(connection).not_to have_received(:respond_decision_task_failed) end end - - context 'when workflow task queries are included' do - let(:query_id) { SecureRandom.uuid } - let(:query_result) { Cadence::Workflow::QueryResult.answer(42) } - - let(:queries) do - end - CadenceThrift::Map.new(:string, :message, CadenceThrift::WorkflowQuery).tap do |map| - map[query_id] = Fabricate(:api_workflow_query) - end - end end From 178f2a71d9da12b24b6e13cbe400c595c8b3b839 Mon Sep 17 00:00:00 2001 From: Daniel Jung Date: Mon, 22 Aug 2022 08:47:50 -0700 Subject: [PATCH 12/12] removed puts statement --- lib/cadence/metadata.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/cadence/metadata.rb b/lib/cadence/metadata.rb index be28b60b..03009794 100644 --- a/lib/cadence/metadata.rb +++ b/lib/cadence/metadata.rb @@ -42,7 +42,6 @@ def activity_metadata_from(task, domain) end def decision_metadata_from(task, domain) - puts(task.workflowType.instance_variables) Metadata::Decision.new( domain: domain, id: task.startedEventId, @@ -50,7 +49,6 @@ def decision_metadata_from(task, domain) attempt: task.attempt, workflow_run_id: task.workflowExecution.runId, workflow_id: task.workflowExecution.workflowId, - # task.workflowType.name workflow_name: task.workflowType.name ) end