diff --git a/README.md b/README.md index 78b2456..7f516ba 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,46 @@ Note that currently the `~PY` sigil does not work as part of Mix project code. This limitation is intentional, since in actual applications it is preferable to manage the Python globals explicitly. +## Python API + +Pythonx provides a Python module named `pythonx` with extra interoperability +features. + +### `pythonx.send_tagged_object(pid, tag, object)` + +Sends a Python object to an Elixir process identified by `pid`. + +The Elixir process receives the message as a `{tag, object}` tuple, +where `tag` is an atom and `object` is a `Pythonx.Object` struct. + +> #### Long-running evaluation {: .warning} +> +> If you are sending messages from Python to Elixir, it likely means +> you have a long-running Python evaluation. If the evaluation holds +> onto GIL for long, you should make sure to only do it from a single +> Elixir process to avoid bottlenecks. For more details see the +> "Concurrency" notes in `Pythonx.eval/3`. + +> #### Decoding {: .warning} +> +> The Elixir process receives a `Pythonx.Object`, which you may want +> to decode right away. Keep in mind that `Pythonx.decode/1` requires +> GIL, so if the ongoing evaluation holds onto GIL for long, decoding +> itself may be blocked. + +**Parameters:** + +- `pid` (`pythonx.PID`) – Opaque PID object, passed into the evaluation. +- `tag` (`str`) – A tag appearning as atom in the Elixir message. +- `object` (`Any`) – Any Python object to be sent as the message. + +### `pythonx.PID` + +Opaque Python object that represents an Elixir PID. + +This object cannot be created within Python, it needs to be passed +into the evaluation as part of globals. + ## How it works [CPython](https://github.com/python/cpython) (the reference diff --git a/c_src/pythonx.cpp b/c_src/pythonx.cpp index f694e71..72663fb 100644 --- a/c_src/pythonx.cpp +++ b/c_src/pythonx.cpp @@ -15,6 +15,11 @@ extern "C" void pythonx_handle_io_write(const char *message, const char *eval_info_bytes, bool type); +extern "C" void +pythonx_handle_send_tagged_object(const char *pid_bytes, const char *tag, + pythonx::python::PyObjectPtr *py_object, + const char *eval_info_bytes); + namespace pythonx { using namespace python; @@ -385,36 +390,46 @@ import ctypes import io import sys import inspect +import types +import sys pythonx_handle_io_write = ctypes.CFUNCTYPE( None, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_bool )(pythonx_handle_io_write_ptr) +pythonx_handle_send_tagged_object = ctypes.CFUNCTYPE( + None, ctypes.c_char_p, ctypes.c_char_p, ctypes.py_object, ctypes.c_char_p +)(pythonx_handle_send_tagged_object_ptr) + + +def get_eval_info_bytes(): + # The evaluation caller has __pythonx_eval_info_bytes__ set in + # their globals. It is not available in globals() here, because + # the globals dict in function definitions is fixed at definition + # time. To find the current evaluation globals, we look at the + # call stack using the inspect module and find the caller with + # __pythonx_eval_info_bytes__ in globals. We look specifically + # for the outermost caller, because intermediate functions could + # be defined by previous evaluations, in which case they would + # have __pythonx_eval_info_bytes__ in their globals, corresponding + # to that previous evaluation. When called within a thread, the + # evaluation caller is not in the stack, so __pythonx_eval_info_bytes__ + # will be found in the thread entrypoint function globals. + call_stack = inspect.stack() + eval_info_bytes = next( + frame_info.frame.f_globals["__pythonx_eval_info_bytes__"] + for frame_info in reversed(call_stack) + if "__pythonx_eval_info_bytes__" in frame_info.frame.f_globals + ) + return eval_info_bytes + class Stdout(io.TextIOBase): def __init__(self, type): self.type = type def write(self, string): - # The evaluation caller has __pythonx_eval_info_bytes__ set in - # their globals. It is not available in globals() here, because - # the globals dict in function definitions is fixed at definition - # time. To find the current evaluation globals, we look at the - # call stack using the inspect module and find the caller with - # __pythonx_eval_info_bytes__ in globals. We look specifically - # for the outermost caller, because intermediate functions could - # be defined by previous evaluations, in which case they would - # have __pythonx_eval_info_bytes__ in their globals, corresponding - # to that previous evaluation. When called within a thread, the - # evaluation caller is not in the stack, so __pythonx_eval_info_bytes__ - # will be found in the thread entrypoint function globals. - call_stack = inspect.stack() - eval_info_bytes = next( - frame_info.frame.f_globals["__pythonx_eval_info_bytes__"] - for frame_info in reversed(call_stack) - if "__pythonx_eval_info_bytes__" in frame_info.frame.f_globals - ) - pythonx_handle_io_write(string.encode("utf-8"), eval_info_bytes, self.type) + pythonx_handle_io_write(string.encode("utf-8"), get_eval_info_bytes(), self.type) return len(string) @@ -426,6 +441,24 @@ class Stdin(io.IOBase): sys.stdout = Stdout(0) sys.stderr = Stdout(1) sys.stdin = Stdin() + +pythonx = types.ModuleType("pythonx") + +class PID: + def __init__(self, bytes): + self.bytes = bytes + + def __repr__(self): + return "" + +pythonx.PID = PID + +def send_tagged_object(pid, tag, object): + pythonx_handle_send_tagged_object(pid.bytes, tag.encode("utf-8"), object, get_eval_info_bytes()) + +pythonx.send_tagged_object = send_tagged_object + +sys.modules["pythonx"] = pythonx )"; auto py_code = PyUnicode_FromStringAndSize(code, sizeof(code) - 1); @@ -449,6 +482,16 @@ sys.stdin = Stdin() "pythonx_handle_io_write_ptr", py_pythonx_handle_io_write_ptr)); + auto py_pythonx_handle_send_tagged_object_ptr = PyLong_FromUnsignedLongLong( + reinterpret_cast(pythonx_handle_send_tagged_object)); + raise_if_failed(env, py_pythonx_handle_send_tagged_object_ptr); + auto py_pythonx_handle_send_tagged_object_ptr_guard = + PyDecRefGuard(py_pythonx_handle_send_tagged_object_ptr); + + raise_if_failed(env, PyDict_SetItemString( + py_globals, "pythonx_handle_send_tagged_object_ptr", + py_pythonx_handle_send_tagged_object_ptr)); + auto py_exec_args = PyTuple_Pack(2, py_code, py_globals); raise_if_failed(env, py_exec_args); auto py_exec_args_guard = PyDecRefGuard(py_exec_args); @@ -699,6 +742,37 @@ fine::Ok<> set_add(ErlNifEnv *env, ExObject ex_object, ExObject ex_key) { FINE_NIF(set_add, ERL_NIF_DIRTY_JOB_CPU_BOUND); +ExObject pid_new(ErlNifEnv *env, ErlNifPid pid) { + ensure_initialized(); + auto gil_guard = PyGILGuard(); + + // ErlNifPid is self-contained struct, not bound to any env, so it's + // safe to copy [1]. + // + // [1]: https://www.erlang.org/doc/apps/erts/erl_nif.html#ErlNifPid + auto py_pid_bytes = PyBytes_FromStringAndSize( + reinterpret_cast(&pid), sizeof(ErlNifPid)); + raise_if_failed(env, py_pid_bytes); + + auto py_pythonx = PyImport_AddModule("pythonx"); + raise_if_failed(env, py_pythonx); + + auto py_PID = PyObject_GetAttrString(py_pythonx, "PID"); + raise_if_failed(env, py_PID); + auto py_PID_guard = PyDecRefGuard(py_PID); + + auto py_PID_args = PyTuple_Pack(1, py_pid_bytes); + raise_if_failed(env, py_PID_args); + auto py_PID_args_guard = PyDecRefGuard(py_PID_args); + + auto py_pid = PyObject_Call(py_PID, py_PID_args, NULL); + raise_if_failed(env, py_pid); + + return ExObject(fine::make_resource(py_pid)); +} + +FINE_NIF(pid_new, ERL_NIF_DIRTY_JOB_CPU_BOUND); + ExObject object_repr(ErlNifEnv *env, ExObject ex_object) { ensure_initialized(); auto gil_guard = PyGILGuard(); @@ -962,6 +1036,31 @@ fine::Term decode_once(ErlNifEnv *env, ExObject ex_object) { std::make_tuple(atoms::map_set, fine::Term(items))); } + auto py_pythonx = PyImport_AddModule("pythonx"); + raise_if_failed(env, py_pythonx); + + auto py_PID = PyObject_GetAttrString(py_pythonx, "PID"); + raise_if_failed(env, py_PID); + auto py_PID_guard = PyDecRefGuard(py_PID); + + auto is_pid = PyObject_IsInstance(py_object, py_PID); + raise_if_failed(env, is_pid); + if (is_pid) { + auto py_pid_bytes = PyObject_GetAttrString(py_object, "bytes"); + raise_if_failed(env, py_pid_bytes); + auto py_pid_bytes_guard = PyDecRefGuard(py_pid_bytes); + + Py_ssize_t size; + char *pid_bytes; + auto result = PyBytes_AsStringAndSize(py_pid_bytes, &pid_bytes, &size); + raise_if_failed(env, result); + + auto pid = ErlNifPid{}; + std::memcpy(&pid, pid_bytes, sizeof(ErlNifPid)); + + return fine::encode(env, pid); + } + // None of the built-ins, return %Pythonx.Object{} as is return fine::encode(env, ex_object); } @@ -1368,16 +1467,16 @@ FINE_INIT("Elixir.Pythonx.NIF"); // Below are functions we call from Python code -extern "C" void pythonx_handle_io_write(const char *message, - const char *eval_info_bytes, - bool type) { +pythonx::EvalInfo eval_info_from_bytes(const char *eval_info_bytes) { // Note that we allocate EvalInfo first, so it will have the proper // alignment and memcpy simply restores the original struct state. auto eval_info = pythonx::EvalInfo{}; std::memcpy(&eval_info, eval_info_bytes, sizeof(pythonx::EvalInfo)); - auto env = enif_alloc_env(); + return eval_info; +} +ErlNifEnv *get_caller_env(pythonx::EvalInfo eval_info) { // The enif_whereis_pid and enif_send functions require passing the // caller env. Stdout write may be called by the evaluated code from // the NIF call, but it may also be called by a Python thread, after @@ -1387,6 +1486,17 @@ extern "C" void pythonx_handle_io_write(const char *message, bool is_main_thread = std::this_thread::get_id() == eval_info.thread_id; auto caller_env = is_main_thread ? eval_info.env : NULL; + return caller_env; +} + +extern "C" void pythonx_handle_io_write(const char *message, + const char *eval_info_bytes, + bool type) { + auto eval_info = eval_info_from_bytes(eval_info_bytes); + + auto env = enif_alloc_env(); + auto caller_env = get_caller_env(eval_info); + // Note that we send the output to Pythonx.Janitor and it then sends // it to the device. We do this to avoid IO replies being sent to // the calling Elixir process (which would be unexpected). Additionally, @@ -1406,3 +1516,24 @@ extern "C" void pythonx_handle_io_write(const char *message, << std::endl; } } + +extern "C" void +pythonx_handle_send_tagged_object(const char *pid_bytes, const char *tag, + pythonx::python::PyObjectPtr *py_object, + const char *eval_info_bytes) { + auto eval_info = eval_info_from_bytes(eval_info_bytes); + + auto caller_env = get_caller_env(eval_info); + auto env = enif_alloc_env(); + + auto pid = ErlNifPid{}; + std::memcpy(&pid, pid_bytes, sizeof(ErlNifPid)); + + auto msg = fine::encode( + env, std::make_tuple( + fine::Atom(tag), + pythonx::ExObject( + fine::make_resource(py_object)))); + enif_send(caller_env, &pid, env, msg); + enif_free_env(env); +} diff --git a/lib/pythonx.ex b/lib/pythonx.ex index 156cc1c..4600022 100644 --- a/lib/pythonx.ex +++ b/lib/pythonx.ex @@ -454,6 +454,7 @@ defmodule Pythonx do * `dict` * `set` * `frozenset` + * `pythonx.PID` For all other types `Pythonx.Object` is returned. diff --git a/lib/pythonx/encoder.ex b/lib/pythonx/encoder.ex index 2ec44bc..919f84f 100644 --- a/lib/pythonx/encoder.ex +++ b/lib/pythonx/encoder.ex @@ -196,3 +196,9 @@ defimpl Pythonx.Encoder, for: MapSet do set end end + +defimpl Pythonx.Encoder, for: PID do + def encode(term, _encoder) do + Pythonx.NIF.pid_new(term) + end +end diff --git a/lib/pythonx/nif.ex b/lib/pythonx/nif.ex index 736d686..6cf1e4e 100644 --- a/lib/pythonx/nif.ex +++ b/lib/pythonx/nif.ex @@ -31,6 +31,7 @@ defmodule Pythonx.NIF do def list_set_item(_object, _index, _value), do: err!() def set_new(), do: err!() def set_add(_object, _key), do: err!() + def pid_new(_pid), do: err!() def object_repr(_object), do: err!() def format_exception(_error), do: err!() def decode_once(_object), do: err!() diff --git a/test/pythonx_test.exs b/test/pythonx_test.exs index b4b04bd..84fdee0 100644 --- a/test/pythonx_test.exs +++ b/test/pythonx_test.exs @@ -59,6 +59,10 @@ defmodule PythonxTest do assert repr(Pythonx.encode!(MapSet.new([1]))) == "{1}" end + test "pid" do + assert repr(Pythonx.encode!(IEx.Helpers.pid(0, 1, 2))) == "" + end + test "identity for Pythonx.Object" do object = Pythonx.encode!(1) assert Pythonx.encode!(object) == object @@ -132,6 +136,12 @@ defmodule PythonxTest do assert Pythonx.decode(eval_result("frozenset({1})")) == MapSet.new([1]) end + test "pid" do + pid = IEx.Helpers.pid(0, 1, 2) + assert {result, %{}} = Pythonx.eval("pid", %{"pid" => pid}) + assert Pythonx.decode(result) == pid + end + test "identity for other objects" do assert repr(Pythonx.decode(eval_result("complex(1)"))) == "(1+0j)" end @@ -449,6 +459,24 @@ defmodule PythonxTest do end end + describe "python API" do + test "pythonx.send sends message to the given pid" do + pid = self() + + assert {_result, %{}} = + Pythonx.eval( + """ + import pythonx + pythonx.send_tagged_object(pid, "message_from_python", ("hello", 1)) + """, + %{"pid" => pid} + ) + + assert_receive {:message_from_python, %Pythonx.Object{} = object} + assert repr(object) == "('hello', 1)" + end + end + defp repr(object) do assert %Pythonx.Object{} = object