diff --git a/apps/hellgate/src/hellgate.app.src b/apps/hellgate/src/hellgate.app.src index e6178f467..5c13b3aa8 100644 --- a/apps/hellgate/src/hellgate.app.src +++ b/apps/hellgate/src/hellgate.app.src @@ -12,6 +12,7 @@ fault_detector_proto, hg_proto, shumpune_proto, + shumaich_proto, cowboy, how_are_you, % must be after ranch and before any woody usage woody, @@ -19,6 +20,7 @@ gproc, dmt_client, party_client, + bender_client, woody_user_identity, payproc_errors, erl_health, diff --git a/apps/hellgate/src/hg_accounting_new.erl b/apps/hellgate/src/hg_accounting_new.erl new file mode 100644 index 000000000..6ae932b1e --- /dev/null +++ b/apps/hellgate/src/hg_accounting_new.erl @@ -0,0 +1,297 @@ +%%% Accounting for shumaich +%%% +%%% TODO +%%% - Brittle posting id assignment, it should be a level upper, maybe even in +%%% `hg_cashflow`. +%%% - Stuff cash flow details in the posting description fields. + +-module(hg_accounting_new). + +-export([get_account/1]). +-export([get_account/2]). + +-export([get_balance/1]). +-export([get_balance/2]). + +-export([collect_account_map/6]). +-export([collect_merchant_account_map/2]). +-export([collect_provider_account_map/3]). +-export([collect_system_account_map/4]). +-export([collect_external_account_map/4]). + +-export([hold/4]). + +-export([plan/3]). +-export([plan/4]). + +-export([commit/4]). + +-export([rollback/4]). + +-include_lib("damsel/include/dmsl_payment_processing_thrift.hrl"). +-include_lib("damsel/include/dmsl_domain_thrift.hrl"). +-include_lib("shumaich_proto/include/shumaich_shumaich_thrift.hrl"). + +-type amount() :: dmsl_domain_thrift:'Amount'(). +-type currency_code() :: dmsl_domain_thrift:'CurrencySymbolicCode'(). +-type account_id() :: dmsl_accounter_thrift:'AccountID'(). +-type plan_id() :: dmsl_accounter_thrift:'PlanID'(). +-type batch_id() :: dmsl_accounter_thrift:'BatchID'(). +-type final_cash_flow() :: dmsl_domain_thrift:'FinalCashFlow'(). +-type batch() :: {batch_id(), final_cash_flow()}. +-type clock() :: dmsl_domain_thrift:'AccounterClock'(). + +-type payment() :: dmsl_domain_thrift:'InvoicePayment'(). +-type shop() :: dmsl_domain_thrift:'Shop'(). +-type payment_institution() :: dmsl_domain_thrift:'PaymentInstitution'(). +-type provider() :: dmsl_domain_thrift:'Provider'(). +-type varset() :: pm_selector:varset(). +-type revision() :: hg_domain:revision(). + +-export_type([batch/0]). +-export_type([clock/0]). + +-type account() :: #{ + account_id => account_id(), + currency_code => currency_code() +}. + +-type balance() :: #{ + account_id => account_id(), + own_amount => amount(), + min_available_amount => amount(), + max_available_amount => amount(), + clock => clock() +}. + +-define(DEFAULT_RETRY_STRATEGY, {exponential, 10, 1.1, 100}). + +-spec get_account(account_id()) -> {ok, account()} | {error, not_ready | {account_not_found, _}}. +get_account(AccountID) -> + get_account(AccountID, undefined). + +-spec get_account(account_id(), undefined | clock()) -> {ok, account()} | {error, not_ready | {account_not_found, _}}. +get_account(AccountID, Clock) -> + case call_accounter('GetAccountByID', {AccountID, to_accounter_clock(Clock)}) of + {ok, Result} -> + {ok, construct_account(AccountID, Result)}; + {error, _} = Error -> + Error + end. + +-spec get_balance(account_id()) -> {ok, balance()} | {error, not_ready | {account_not_found, _}}. +get_balance(AccountID) -> + get_balance(AccountID, undefined). + +-spec get_balance(account_id(), undefined | clock()) -> {ok, balance()} | {error, not_ready | {account_not_found, _}}. +get_balance(AccountID, Clock) -> + case call_accounter('GetBalanceByID', {AccountID, to_accounter_clock(Clock)}) of + {ok, Result} -> + {ok, construct_balance(AccountID, Result)}; + {error, _} = Error -> + Error + end. + +-spec collect_account_map(payment(), shop(), payment_institution(), provider(), varset(), revision()) -> map(). +collect_account_map(Payment, Shop, PaymentInstitution, Provider, VS, Revision) -> + hg_accounting:collect_account_map(Payment, Shop, PaymentInstitution, Provider, VS, Revision). + +-spec collect_merchant_account_map(shop(), map()) -> map(). +collect_merchant_account_map(Shop, Acc) -> + hg_accounting:collect_merchant_account_map(Shop, Acc). + +-spec collect_provider_account_map(payment(), provider(), map()) -> map(). +collect_provider_account_map(Payment, Provider, Acc) -> + hg_accounting:collect_provider_account_map(Payment, Provider, Acc). + +-spec collect_system_account_map(payment(), payment_institution(), revision(), map()) -> map(). +collect_system_account_map(Payment, PaymentInstitution, Revision, Acc) -> + hg_accounting:collect_system_account_map(Payment, PaymentInstitution, Revision, Acc). + +-spec collect_external_account_map(payment(), varset(), revision(), map()) -> map(). +collect_external_account_map(Payment, VS, Revision, Acc) -> + hg_accounting:collect_external_account_map(Payment, VS, Revision, Acc). + +%% +-spec plan(plan_id(), [batch()], hg_datetime:timestamp()) -> + {ok, clock()} | {error, not_ready | {invalid_posting_params, _}}. +plan(_PlanID, [], _Timestamp) -> + error(badarg); +plan(_PlanID, Batches, _Timestamp) when not is_list(Batches) -> + error(badarg); +plan(PlanID, Batches, Timestamp) -> + execute_plan(PlanID, Batches, Timestamp, undefined). + +-spec plan(plan_id(), [batch()], hg_datetime:timestamp(), clock()) -> + {ok, clock()} | {error, not_ready | {invalid_posting_params, _}}. +plan(_PlanID, [], _Timestamp, _Clock) -> + error(badarg); +plan(_PlanID, Batches, _Timestamp, _Clock) when not is_list(Batches) -> + error(badarg); +plan(PlanID, Batches, Timestamp, Clock) -> + execute_plan(PlanID, Batches, Timestamp, Clock). + +-spec hold(plan_id(), batch(), hg_datetime:timestamp(), clock() | undefined) -> + {ok, clock()} | {error, not_ready | {invalid_posting_params, _}}. +hold(PlanID, Batch, Timestamp, Clock) -> + AccounterClock = to_accounter_clock(Clock), + do('Hold', construct_plan_change(PlanID, Batch, Timestamp), AccounterClock). + +-spec commit(plan_id(), [batch()], hg_datetime:timestamp(), clock() | undefined) -> + {ok, clock()} | {error, not_ready | {invalid_posting_params, _}}. +commit(PlanID, Batches, Timestamp, Clock) -> + AccounterClock = to_accounter_clock(Clock), + do('CommitPlan', construct_plan(PlanID, Batches, Timestamp), AccounterClock). + +-spec rollback(plan_id(), [batch()], hg_datetime:timestamp(), clock() | undefined) -> + {ok, clock()} | {error, not_ready | {invalid_posting_params, _}}. +rollback(PlanID, Batches, Timestamp, Clock) -> + AccounterClock = to_accounter_clock(Clock), + do('RollbackPlan', construct_plan(PlanID, Batches, Timestamp), AccounterClock). + +do(Op, Plan, PreviousClock) -> + case call_accounter(Op, {Plan, PreviousClock}) of + {ok, Clock} -> + {ok, to_domain_clock(Clock)}; + {error, _} = Error -> + Error + end. + +execute_plan(_PlanID, [], _Timestamp, Clock) -> + {ok, Clock}; +execute_plan(PlanID, [Batch | Rest], Timestamp, Clock) -> + case hold(PlanID, Batch, Timestamp, Clock) of + {ok, NewClock} -> + execute_plan(PlanID, Rest, Timestamp, NewClock); + {error, _} = Error -> + Error + end. + +construct_plan_change(PlanID, {BatchID, Cashflow}, Timestamp) -> + #shumaich_PostingPlanChange{ + id = PlanID, + creation_time = Timestamp, + batch = #shumaich_PostingBatch{ + id = BatchID, + postings = collect_postings(Cashflow) + } + }. + +construct_plan(PlanID, Batches, Timestamp) -> + #shumaich_PostingPlan{ + id = PlanID, + creation_time = Timestamp, + batch_list = [ + #shumaich_PostingBatch{ + id = BatchID, + postings = collect_postings(Cashflow) + } + || {BatchID, Cashflow} <- Batches + ] + }. + +collect_postings(Cashflow) -> + [ + #shumaich_Posting{ + from_account = #shumaich_Account{id = Source, currency_symbolic_code = CurrencyCode}, + to_account = #shumaich_Account{id = Destination, currency_symbolic_code = CurrencyCode}, + amount = Amount, + currency_symbolic_code = CurrencyCode, + description = construct_posting_description(Details) + } + || #domain_FinalCashFlowPosting{ + source = #domain_FinalCashFlowAccount{account_id = Source}, + destination = #domain_FinalCashFlowAccount{account_id = Destination}, + details = Details, + volume = #domain_Cash{ + amount = Amount, + currency = #domain_CurrencyRef{symbolic_code = CurrencyCode} + } + } <- Cashflow + ]. + +construct_posting_description(Details) when is_binary(Details) -> + Details; +construct_posting_description(undefined) -> + <<>>. + +%% + +construct_account( + AccountID, + #shumaich_Account{ + currency_symbolic_code = CurrencyCode + } +) -> + #{ + account_id => AccountID, + currency_code => CurrencyCode + }. + +construct_balance( + AccountID, + #shumaich_Balance{ + own_amount = OwnAmount, + min_available_amount = MinAvailableAmount, + max_available_amount = MaxAvailableAmount, + clock = Clock + } +) -> + genlib_map:compact(#{ + account_id => AccountID, + own_amount => OwnAmount, + min_available_amount => MinAvailableAmount, + max_available_amount => MaxAvailableAmount, + clock => to_domain_clock(Clock) + }). + +%% + +call_accounter(Function, Args) -> + hg_retry:apply( + fun() -> + case call_service(Function, Args) of + {ok, _} = Ok -> + {return, Ok}; + {error, ErrorType} = Error -> + {map_error_action(ErrorType), Error} + end + end, + get_retry_strategy(Function) + ). + +call_service(Function, Args) -> + case hg_woody_wrapper:call(accounter_new, Function, Args) of + {ok, _} = Ok -> + Ok; + {exception, Exception} -> + {error, map_exception(Exception)} + end. + +map_error_action(not_ready) -> + retry; +map_error_action(_) -> + return. + +map_exception(#shumaich_NotReady{}) -> + not_ready; +map_exception(#shumaich_AccountNotFound{account_id = AccountID}) -> + {account_not_found, AccountID}; +map_exception(#shumaich_PlanNotFound{plan_id = PlanID}) -> + {plan_not_found, PlanID}; +map_exception(#shumaich_InvalidPostingParams{wrong_postings = WrongPostings}) -> + {invalid_posting_params, WrongPostings}. + +to_domain_clock({latest, #shumaich_LatestClock{}}) -> + undefined; +to_domain_clock({vector, #shumaich_VectorClock{state = State}}) -> + {vector, #domain_VectorClock{state = State}}. + +to_accounter_clock(undefined) -> + {latest, #shumaich_LatestClock{}}; +to_accounter_clock({vector, #domain_VectorClock{state = State}}) -> + {vector, #shumaich_VectorClock{state = State}}. + +get_retry_strategy(Function) -> + PolicyConfig = genlib_app:env(hellgate, accounter_retry_policy, #{}), + hg_retry:new_strategy(maps:get(Function, PolicyConfig, ?DEFAULT_RETRY_STRATEGY)). diff --git a/apps/hellgate/src/hg_retry.erl b/apps/hellgate/src/hg_retry.erl index 4058cad9e..fdc5d1180 100644 --- a/apps/hellgate/src/hg_retry.erl +++ b/apps/hellgate/src/hg_retry.erl @@ -3,7 +3,8 @@ -export([ next_step/1, skip_steps/2, - new_strategy/1 + new_strategy/1, + apply/2 ]). -type retries_num() :: pos_integer() | infinity. @@ -58,4 +59,25 @@ skip_steps(Strategy, N) when N > 0 -> end, skip_steps(NewStrategy, N - 1). +-type retry_fun_result() :: any(). +-type retry_fun_action() :: retry | return. +-type retry_fun() :: fun(() -> {retry_fun_action(), retry_fun_result()}). + +-compile({no_auto_import, [apply/2]}). + +-spec apply(retry_fun(), strategy()) -> retry_fun_result(). +apply(Fun, Strategy) -> + case Fun() of + {return, Result} -> + Result; + {retry, Error} -> + case next_step(Strategy) of + {wait, Timeout, NextStrategy} -> + _ = timer:sleep(Timeout), + apply(Fun, NextStrategy); + finish -> + Error + end + end. + %%% Internal functions diff --git a/apps/hellgate/src/hg_utils.erl b/apps/hellgate/src/hg_utils.erl index d37a6baa4..318461da0 100644 --- a/apps/hellgate/src/hg_utils.erl +++ b/apps/hellgate/src/hg_utils.erl @@ -12,6 +12,12 @@ -export([format_reason/1]). +-export([gen_sequence/2]). +-export([gen_sequence/3]). + +-type sequence_params() :: #{minimum => integer()}. +-type woody_context() :: woody_context:ctx(). + %% -spec unique_id() -> dmsl_base_thrift:'ID'(). @@ -80,3 +86,13 @@ unwrap_result({error, E}) -> %% TODO: fix this dirty hack format_reason(V) -> genlib:to_binary(V). + +-spec gen_sequence(binary(), woody_context()) -> integer(). +gen_sequence(SequenceID, WoodyContext) -> + gen_sequence(SequenceID, WoodyContext, #{}). + +-spec gen_sequence(binary(), woody_context(), sequence_params()) -> integer(). +gen_sequence(SequenceID, WoodyContext, Params) -> + case bender_generator_client:gen_sequence(SequenceID, WoodyContext, Params) of + {ok, {_, ID}} -> ID + end. diff --git a/apps/hellgate/test/hg_ct_helper.erl b/apps/hellgate/test/hg_ct_helper.erl index e194432af..ee0578ccc 100644 --- a/apps/hellgate/test/hg_ct_helper.erl +++ b/apps/hellgate/test/hg_ct_helper.erl @@ -134,6 +134,7 @@ start_app(hellgate = AppName) -> }}, {services, #{ accounter => <<"http://shumway:8022/shumpune">>, + accounter_new => <<"http://shumway:8022/shumaich">>, automaton => <<"http://machinegun:8022/v1/automaton">>, customer_management => #{ url => <<"http://hellgate:8022/v1/processing/customer_management">>, diff --git a/apps/hellgate/test/hg_invoice_tests_SUITE.erl b/apps/hellgate/test/hg_invoice_tests_SUITE.erl index 7e4154de4..dce109c4e 100644 --- a/apps/hellgate/test/hg_invoice_tests_SUITE.erl +++ b/apps/hellgate/test/hg_invoice_tests_SUITE.erl @@ -4664,20 +4664,35 @@ consistent_account_balances(C) -> Party = hg_client_party:get(PartyClient), Shops = maps:values(Party#domain_Party.shops), _ = [ - consistent_account_balance(AccountID, Shop) + { + consistent_account_balance(AccountID, Shop), + consistent_account_balance_new(AccountID, Shop) + } || #domain_Shop{account = ShopAccount} = Shop <- Shops, #domain_ShopAccount{settlement = AccountID1, guarantee = AccountID2} <- [ShopAccount], AccountID <- [AccountID1, AccountID2] ]. consistent_account_balance(AccountID, Comment) -> - case hg_ct_helper:get_balance(AccountID) of + case hg_accounting:get_balance(AccountID) of #{own_amount := V, min_available_amount := V, max_available_amount := V} -> ok; #{} = Account -> erlang:error({"Inconsistent account balance", Account, Comment}) end. +consistent_account_balance_new(AccountID, Comment) -> + %% TODO: Switch to hg_accounting_new when all operations are migrated + try hg_accounting:get_balance(AccountID) of + #{own_amount := V, min_available_amount := V, max_available_amount := V} -> + ok; + #{} = Account -> + erlang:error({"Inconsistent account balance (new)", Account, Comment}) + catch + #payproc_AccountNotFound{} -> + ok + end. + %% next_event(InvoiceID, Client) -> diff --git a/apps/hg_proto/src/hg_proto.erl b/apps/hg_proto/src/hg_proto.erl index 189179e2f..b386e05c0 100644 --- a/apps/hg_proto/src/hg_proto.erl +++ b/apps/hg_proto/src/hg_proto.erl @@ -40,6 +40,8 @@ get_service(proxy_host_provider) -> {dmsl_proxy_provider_thrift, 'ProviderProxyHost'}; get_service(accounter) -> {shumpune_shumpune_thrift, 'Accounter'}; +get_service(accounter_new) -> + {shumaich_shumaich_thrift, 'Accounter'}; get_service(automaton) -> {mg_proto_state_processing_thrift, 'Automaton'}; get_service(processor) -> diff --git a/apps/party_management/src/party_management.app.src b/apps/party_management/src/party_management.app.src index 39679d000..a61eac9de 100644 --- a/apps/party_management/src/party_management.app.src +++ b/apps/party_management/src/party_management.app.src @@ -11,6 +11,7 @@ genlib, pm_proto, shumpune_proto, + shumaich_proto, cowboy, how_are_you, % must be after ranch and before any woody usage woody, @@ -20,6 +21,7 @@ woody_user_identity, payproc_errors, erl_health, + bender_client, cache ]}, {env, []}, diff --git a/config/sys.config b/config/sys.config index ba3e998a8..72c070dfc 100644 --- a/config/sys.config +++ b/config/sys.config @@ -51,6 +51,7 @@ automaton => "http://machinegun:8022/v1/automaton", eventsink => "http://machinegun:8022/v1/event_sink", accounter => "http://shumway:8022/shumpune", + accounter_new => "http://shumway:8022/shumaich", party_management => "http://hellgate:8022/v1/processing/partymgmt", customer_management => "http://hellgate:8022/v1/processing/customer_management", % TODO make more consistent diff --git a/docker-compose.sh b/docker-compose.sh index 403e2b0c6..bd5009b53 100755 --- a/docker-compose.sh +++ b/docker-compose.sh @@ -1,6 +1,6 @@ #!/bin/bash cat < + bash -c 'echo Waiting for Kafka to be ready... && + cub kafka-ready -b broker:9092 1 60 && + kafka-topics --create --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 --topic operation_log' + + shumaich: + image: dr2.rbkmoney.com/rbkmoney/shumaich:3be4048303d9a649027faa95d87a5ecd99af1e6b + hostname: shumaich + restart: on-failure + environment: + SPRING_APPLICATION_JSON: '{ + "rocksdb.name": "shumaich", + "rocksdb.dir": "/temp/rocksdb", + "kafka.bootstrap-servers": "broker:9092", + "kafka.topics.operation-log-name": "operation_log", + "management.metrics.export.statsd.enabled": "false" + }' depends_on: - - shumway-db + - broker + - kafka-setup + volumes: + - type: volume + target: /temp/rocksdb/shumaich + volume: + nocopy: true healthcheck: test: "curl http://localhost:8022/" interval: 5s timeout: 1s retries: 20 - shumway-db: - image: dr2.rbkmoney.com/rbkmoney/postgres:9.6 + postgres: + image: postgres:9.6 environment: - - POSTGRES_DB=shumway - - POSTGRES_USER=postgres - - POSTGRES_PASSWORD=postgres - - SERVICE_NAME=shumway-db + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: shumway + volumes: + - type: volume + target: /var/lib/postgresql/data/ + volume: + nocopy: true EOF diff --git a/rebar.config b/rebar.config index 6b1be24f0..f248bff85 100644 --- a/rebar.config +++ b/rebar.config @@ -38,6 +38,7 @@ {mg_proto, {git, "https://github.com/rbkmoney/machinegun_proto.git", {branch, "master"}}}, {shumpune_proto, {git, "https://github.com/rbkmoney/shumpune-proto.git", {ref, "a0aed3bdce6aafdb832bbcde45e6278222b08c0b"}}}, + {shumaich_proto, {git, "git@github.com:rbkmoney/shumaich-proto.git", {branch, "master"}}}, {dmt_client, {git, "https://github.com/rbkmoney/dmt_client.git", {branch, "master"}}}, {scoper, {git, "https://github.com/rbkmoney/scoper.git", {branch, "master"}}}, {party_client, {git, "https://github.com/rbkmoney/party_client_erlang.git", {branch, "master"}}}, @@ -99,7 +100,8 @@ ]}. {plugins, [ - {erlfmt, "0.10.0"} + {erlfmt, "0.10.0"}, + {rebar3_thrift_compiler, {git, "https://github.com/rbkmoney/rebar3_thrift_compiler.git", {tag, "0.3.1"}}} ]}. {erlfmt, [ diff --git a/rebar.lock b/rebar.lock index 75e09afa9..33e73bacb 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,6 +1,14 @@ {"1.2.0", [{<<"accept">>,{pkg,<<"accept">>,<<"0.3.5">>},2}, {<<"bear">>,{pkg,<<"bear">>,<<"0.8.7">>},2}, + {<<"bender_client">>, + {git,"git@github.com:rbkmoney/bender_client_erlang.git", + {ref,"3c1489a397dacd1e613b777834ab511023afad36"}}, + 0}, + {<<"bender_proto">>, + {git,"git@github.com:rbkmoney/bender-proto.git", + {ref,"0d5813b8a25c8d03e4e59e42aa5f4e9b785a3849"}}, + 1}, {<<"cache">>,{pkg,<<"cache">>,<<"2.3.3">>},0}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.3">>},2}, {<<"cg_mon">>, @@ -55,6 +63,10 @@ {ref,"d814d6948d4ff13f6f41d12c6613f59c805750b2"}}, 0}, {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},2}, + {<<"msgpack_proto">>, + {git,"git@github.com:rbkmoney/msgpack-proto.git", + {ref,"ec15d5e854ea60c58467373077d90c2faf6273d8"}}, + 2}, {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},2}, {<<"party_client">>, {git,"https://github.com/rbkmoney/party_client_erlang.git", @@ -72,6 +84,10 @@ {git,"https://github.com/rbkmoney/scoper.git", {ref,"89a973bf3cedc5a48c9fd89d719d25e79fe10027"}}, 0}, + {<<"shumaich_proto">>, + {git,"git@github.com:rbkmoney/shumaich-proto.git", + {ref,"32f6ad35d6ed9e432aadbf66cbb075c23d71fbc1"}}, + 0}, {<<"shumpune_proto">>, {git,"https://github.com/rbkmoney/shumpune-proto.git", {ref,"a0aed3bdce6aafdb832bbcde45e6278222b08c0b"}},