Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,46 @@ Note that currently the `~PY` sigil does not work as part of Mix project
code. This limitation is intentional, since in actual applications it
is preferable to manage the Python globals explicitly.

## Python API

Pythonx provides a Python module named `pythonx` with extra interoperability
features.

### `pythonx.send_tagged_object(pid, tag, object)`

Sends a Python object to an Elixir process identified by `pid`.

The Elixir process receives the message as a `{tag, object}` tuple,
where `tag` is an atom and `object` is a `Pythonx.Object` struct.

> #### Long-running evaluation {: .warning}
>
> If you are sending messages from Python to Elixir, it likely means
> you have a long-running Python evaluation. If the evaluation holds
> onto GIL for long, you should make sure to only do it from a single
> Elixir process to avoid bottlenecks. For more details see the
> "Concurrency" notes in `Pythonx.eval/3`.

> #### Decoding {: .warning}
>
> The Elixir process receives a `Pythonx.Object`, which you may want
> to decode right away. Keep in mind that `Pythonx.decode/1` requires
> GIL, so if the ongoing evaluation holds onto GIL for long, decoding
> itself may be blocked.

**Parameters:**

- `pid` (`pythonx.PID`) – Opaque PID object, passed into the evaluation.
- `tag` (`str`) – A tag appearning as atom in the Elixir message.
- `object` (`Any`) – Any Python object to be sent as the message.

### `pythonx.PID`

Opaque Python object that represents an Elixir PID.

This object cannot be created within Python, it needs to be passed
into the evaluation as part of globals.

## How it works

[CPython](https://github.com/python/cpython) (the reference
Expand Down
177 changes: 154 additions & 23 deletions c_src/pythonx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
extern "C" void pythonx_handle_io_write(const char *message,
const char *eval_info_bytes, bool type);

extern "C" void
pythonx_handle_send_tagged_object(const char *pid_bytes, const char *tag,
pythonx::python::PyObjectPtr *py_object,
const char *eval_info_bytes);

namespace pythonx {

using namespace python;
Expand Down Expand Up @@ -385,36 +390,46 @@ import ctypes
import io
import sys
import inspect
import types
import sys

pythonx_handle_io_write = ctypes.CFUNCTYPE(
None, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_bool
)(pythonx_handle_io_write_ptr)

pythonx_handle_send_tagged_object = ctypes.CFUNCTYPE(
None, ctypes.c_char_p, ctypes.c_char_p, ctypes.py_object, ctypes.c_char_p
)(pythonx_handle_send_tagged_object_ptr)


def get_eval_info_bytes():
# The evaluation caller has __pythonx_eval_info_bytes__ set in
# their globals. It is not available in globals() here, because
# the globals dict in function definitions is fixed at definition
# time. To find the current evaluation globals, we look at the
# call stack using the inspect module and find the caller with
# __pythonx_eval_info_bytes__ in globals. We look specifically
# for the outermost caller, because intermediate functions could
# be defined by previous evaluations, in which case they would
# have __pythonx_eval_info_bytes__ in their globals, corresponding
# to that previous evaluation. When called within a thread, the
# evaluation caller is not in the stack, so __pythonx_eval_info_bytes__
# will be found in the thread entrypoint function globals.
call_stack = inspect.stack()
eval_info_bytes = next(
frame_info.frame.f_globals["__pythonx_eval_info_bytes__"]
for frame_info in reversed(call_stack)
if "__pythonx_eval_info_bytes__" in frame_info.frame.f_globals
)
return eval_info_bytes


class Stdout(io.TextIOBase):
def __init__(self, type):
self.type = type

def write(self, string):
# The evaluation caller has __pythonx_eval_info_bytes__ set in
# their globals. It is not available in globals() here, because
# the globals dict in function definitions is fixed at definition
# time. To find the current evaluation globals, we look at the
# call stack using the inspect module and find the caller with
# __pythonx_eval_info_bytes__ in globals. We look specifically
# for the outermost caller, because intermediate functions could
# be defined by previous evaluations, in which case they would
# have __pythonx_eval_info_bytes__ in their globals, corresponding
# to that previous evaluation. When called within a thread, the
# evaluation caller is not in the stack, so __pythonx_eval_info_bytes__
# will be found in the thread entrypoint function globals.
call_stack = inspect.stack()
eval_info_bytes = next(
frame_info.frame.f_globals["__pythonx_eval_info_bytes__"]
for frame_info in reversed(call_stack)
if "__pythonx_eval_info_bytes__" in frame_info.frame.f_globals
)
pythonx_handle_io_write(string.encode("utf-8"), eval_info_bytes, self.type)
pythonx_handle_io_write(string.encode("utf-8"), get_eval_info_bytes(), self.type)
return len(string)


Expand All @@ -426,6 +441,24 @@ class Stdin(io.IOBase):
sys.stdout = Stdout(0)
sys.stderr = Stdout(1)
sys.stdin = Stdin()

pythonx = types.ModuleType("pythonx")

class PID:
def __init__(self, bytes):
self.bytes = bytes

def __repr__(self):
return "<pythonx.PID>"

pythonx.PID = PID

def send_tagged_object(pid, tag, object):
pythonx_handle_send_tagged_object(pid.bytes, tag.encode("utf-8"), object, get_eval_info_bytes())

pythonx.send_tagged_object = send_tagged_object

sys.modules["pythonx"] = pythonx
)";

auto py_code = PyUnicode_FromStringAndSize(code, sizeof(code) - 1);
Expand All @@ -449,6 +482,16 @@ sys.stdin = Stdin()
"pythonx_handle_io_write_ptr",
py_pythonx_handle_io_write_ptr));

auto py_pythonx_handle_send_tagged_object_ptr = PyLong_FromUnsignedLongLong(
reinterpret_cast<uintptr_t>(pythonx_handle_send_tagged_object));
raise_if_failed(env, py_pythonx_handle_send_tagged_object_ptr);
auto py_pythonx_handle_send_tagged_object_ptr_guard =
PyDecRefGuard(py_pythonx_handle_send_tagged_object_ptr);

raise_if_failed(env, PyDict_SetItemString(
py_globals, "pythonx_handle_send_tagged_object_ptr",
py_pythonx_handle_send_tagged_object_ptr));

auto py_exec_args = PyTuple_Pack(2, py_code, py_globals);
raise_if_failed(env, py_exec_args);
auto py_exec_args_guard = PyDecRefGuard(py_exec_args);
Expand Down Expand Up @@ -699,6 +742,37 @@ fine::Ok<> set_add(ErlNifEnv *env, ExObject ex_object, ExObject ex_key) {

FINE_NIF(set_add, ERL_NIF_DIRTY_JOB_CPU_BOUND);

ExObject pid_new(ErlNifEnv *env, ErlNifPid pid) {
ensure_initialized();
auto gil_guard = PyGILGuard();

// ErlNifPid is self-contained struct, not bound to any env, so it's
// safe to copy [1].
//
// [1]: https://www.erlang.org/doc/apps/erts/erl_nif.html#ErlNifPid
auto py_pid_bytes = PyBytes_FromStringAndSize(
reinterpret_cast<const char *>(&pid), sizeof(ErlNifPid));
raise_if_failed(env, py_pid_bytes);

auto py_pythonx = PyImport_AddModule("pythonx");
raise_if_failed(env, py_pythonx);

auto py_PID = PyObject_GetAttrString(py_pythonx, "PID");
raise_if_failed(env, py_PID);
auto py_PID_guard = PyDecRefGuard(py_PID);

auto py_PID_args = PyTuple_Pack(1, py_pid_bytes);
raise_if_failed(env, py_PID_args);
auto py_PID_args_guard = PyDecRefGuard(py_PID_args);

auto py_pid = PyObject_Call(py_PID, py_PID_args, NULL);
raise_if_failed(env, py_pid);

return ExObject(fine::make_resource<ExObjectResource>(py_pid));
}

FINE_NIF(pid_new, ERL_NIF_DIRTY_JOB_CPU_BOUND);

ExObject object_repr(ErlNifEnv *env, ExObject ex_object) {
ensure_initialized();
auto gil_guard = PyGILGuard();
Expand Down Expand Up @@ -962,6 +1036,31 @@ fine::Term decode_once(ErlNifEnv *env, ExObject ex_object) {
std::make_tuple(atoms::map_set, fine::Term(items)));
}

auto py_pythonx = PyImport_AddModule("pythonx");
raise_if_failed(env, py_pythonx);

auto py_PID = PyObject_GetAttrString(py_pythonx, "PID");
raise_if_failed(env, py_PID);
auto py_PID_guard = PyDecRefGuard(py_PID);

auto is_pid = PyObject_IsInstance(py_object, py_PID);
raise_if_failed(env, is_pid);
if (is_pid) {
auto py_pid_bytes = PyObject_GetAttrString(py_object, "bytes");
raise_if_failed(env, py_pid_bytes);
auto py_pid_bytes_guard = PyDecRefGuard(py_pid_bytes);

Py_ssize_t size;
char *pid_bytes;
auto result = PyBytes_AsStringAndSize(py_pid_bytes, &pid_bytes, &size);
raise_if_failed(env, result);

auto pid = ErlNifPid{};
std::memcpy(&pid, pid_bytes, sizeof(ErlNifPid));

return fine::encode(env, pid);
}

// None of the built-ins, return %Pythonx.Object{} as is
return fine::encode(env, ex_object);
}
Expand Down Expand Up @@ -1368,16 +1467,16 @@ FINE_INIT("Elixir.Pythonx.NIF");

// Below are functions we call from Python code

extern "C" void pythonx_handle_io_write(const char *message,
const char *eval_info_bytes,
bool type) {
pythonx::EvalInfo eval_info_from_bytes(const char *eval_info_bytes) {
// Note that we allocate EvalInfo first, so it will have the proper
// alignment and memcpy simply restores the original struct state.
auto eval_info = pythonx::EvalInfo{};
std::memcpy(&eval_info, eval_info_bytes, sizeof(pythonx::EvalInfo));

auto env = enif_alloc_env();
return eval_info;
}

ErlNifEnv *get_caller_env(pythonx::EvalInfo eval_info) {
// The enif_whereis_pid and enif_send functions require passing the
// caller env. Stdout write may be called by the evaluated code from
// the NIF call, but it may also be called by a Python thread, after
Expand All @@ -1387,6 +1486,17 @@ extern "C" void pythonx_handle_io_write(const char *message,
bool is_main_thread = std::this_thread::get_id() == eval_info.thread_id;
auto caller_env = is_main_thread ? eval_info.env : NULL;

return caller_env;
}

extern "C" void pythonx_handle_io_write(const char *message,
const char *eval_info_bytes,
bool type) {
auto eval_info = eval_info_from_bytes(eval_info_bytes);

auto env = enif_alloc_env();
auto caller_env = get_caller_env(eval_info);

// Note that we send the output to Pythonx.Janitor and it then sends
// it to the device. We do this to avoid IO replies being sent to
// the calling Elixir process (which would be unexpected). Additionally,
Expand All @@ -1406,3 +1516,24 @@ extern "C" void pythonx_handle_io_write(const char *message,
<< std::endl;
}
}

extern "C" void
pythonx_handle_send_tagged_object(const char *pid_bytes, const char *tag,
pythonx::python::PyObjectPtr *py_object,
const char *eval_info_bytes) {
auto eval_info = eval_info_from_bytes(eval_info_bytes);

auto caller_env = get_caller_env(eval_info);
auto env = enif_alloc_env();

auto pid = ErlNifPid{};
std::memcpy(&pid, pid_bytes, sizeof(ErlNifPid));

auto msg = fine::encode(
env, std::make_tuple(
fine::Atom(tag),
pythonx::ExObject(
fine::make_resource<pythonx::ExObjectResource>(py_object))));
enif_send(caller_env, &pid, env, msg);
enif_free_env(env);
}
1 change: 1 addition & 0 deletions lib/pythonx.ex
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ defmodule Pythonx do
* `dict`
* `set`
* `frozenset`
* `pythonx.PID`

For all other types `Pythonx.Object` is returned.

Expand Down
6 changes: 6 additions & 0 deletions lib/pythonx/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,9 @@ defimpl Pythonx.Encoder, for: MapSet do
set
end
end

defimpl Pythonx.Encoder, for: PID do
def encode(term, _encoder) do
Pythonx.NIF.pid_new(term)
end
end
1 change: 1 addition & 0 deletions lib/pythonx/nif.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ defmodule Pythonx.NIF do
def list_set_item(_object, _index, _value), do: err!()
def set_new(), do: err!()
def set_add(_object, _key), do: err!()
def pid_new(_pid), do: err!()
def object_repr(_object), do: err!()
def format_exception(_error), do: err!()
def decode_once(_object), do: err!()
Expand Down
28 changes: 28 additions & 0 deletions test/pythonx_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ defmodule PythonxTest do
assert repr(Pythonx.encode!(MapSet.new([1]))) == "{1}"
end

test "pid" do
assert repr(Pythonx.encode!(IEx.Helpers.pid(0, 1, 2))) == "<pythonx.PID>"
end

test "identity for Pythonx.Object" do
object = Pythonx.encode!(1)
assert Pythonx.encode!(object) == object
Expand Down Expand Up @@ -132,6 +136,12 @@ defmodule PythonxTest do
assert Pythonx.decode(eval_result("frozenset({1})")) == MapSet.new([1])
end

test "pid" do
pid = IEx.Helpers.pid(0, 1, 2)
assert {result, %{}} = Pythonx.eval("pid", %{"pid" => pid})
assert Pythonx.decode(result) == pid
end

test "identity for other objects" do
assert repr(Pythonx.decode(eval_result("complex(1)"))) == "(1+0j)"
end
Expand Down Expand Up @@ -449,6 +459,24 @@ defmodule PythonxTest do
end
end

describe "python API" do
test "pythonx.send sends message to the given pid" do
pid = self()

assert {_result, %{}} =
Pythonx.eval(
"""
import pythonx
pythonx.send_tagged_object(pid, "message_from_python", ("hello", 1))
""",
%{"pid" => pid}
)

assert_receive {:message_from_python, %Pythonx.Object{} = object}
assert repr(object) == "('hello', 1)"
end
end

defp repr(object) do
assert %Pythonx.Object{} = object

Expand Down