diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml deleted file mode 100644 index 92014b84..00000000 --- a/.github/workflows/build.yml +++ /dev/null @@ -1,189 +0,0 @@ -name: build - -on: - pull_request: - push: - branches: - - master - tags: - - '*' - -jobs: - - build_openmp: - runs-on: ${{ matrix.os }} - defaults: - run: - shell: bash -l {0} - strategy: - fail-fast: false - matrix: - os: [ ubuntu-latest ] - python-version: [ 3.9, "3.10", "3.11" ] - - env: - PYTHON_VERSION: ${{ matrix.python-version }} - - steps: - - uses: actions/checkout@v4 - with: - submodules: recursive - - uses: conda-incubator/setup-miniconda@v3 - with: - auto-update-conda: true - python-version: ${{ matrix.python-version }} - miniforge-version: latest - channels: conda-forge,anaconda - - name: Install dependencies - run: | - conda install scipy numpy pytest 'setuptools<=60' - - name: Test - run: | - python legacy_setup.py install --scs --openmp - pytest - rm -rf build/ - - build_mkl: - runs-on: ${{ matrix.os }} - defaults: - run: - shell: bash -l {0} - strategy: - fail-fast: false - matrix: - # macos-13 runners have intel chips. macos-14 and above - # runners have Apple silicon chips. - os: [ ubuntu-latest, macos-13, windows-latest ] - python-version: [ 3.9, "3.10", "3.11", "3.12", "3.13"] - link_mkl: [true] - - env: - PYTHON_VERSION: ${{ matrix.python-version }} - LINK_MKL: ${{ matrix.link_mkl }} - - steps: - - uses: actions/checkout@v4 - with: - submodules: recursive - - - name: Set Additional Envs - shell: bash - run: | - echo "PYTHON_SUBVERSION=$(echo $PYTHON_VERSION | cut -c 3-)" >> $GITHUB_ENV - - uses: conda-incubator/setup-miniconda@v3 - with: - auto-update-conda: true - python-version: ${{ matrix.python-version }} - miniforge-version: latest - channels: conda-forge,anaconda - - name: Install dependencies - run: | - if [[ "$LINK_MKL" == "true" ]]; then - BLAS_PKGS="blas-devel=*=*mkl" - else - BLAS_PKGS="blas-devel=*=*openblas" - fi - if [[ "$PYTHON_VERSION" == "3.9" ]]; then - conda install scipy=1.5 numpy=1.19 pytest $BLAS_PKGS pkg-config - elif [[ "$PYTHON_VERSION" == "3.10" ]]; then - conda install scipy=1.7 numpy=1.21 pytest $BLAS_PKGS pkg-config - elif [[ "$PYTHON_VERSION" == "3.11" ]]; then - conda install scipy=1.9.3 numpy=1.23.4 pytest $BLAS_PKGS pkg-config - elif [[ "$PYTHON_VERSION" == "3.12" || "$PYTHON_VERSION" == "3.13" ]]; then - conda install scipy numpy pytest $BLAS_PKGS pkg-config - fi - - name: Build - run: | - python -c "import numpy as np; print('NUMPY BLAS INFOS'); print(np.show_config())" - if [[ "$LINK_MKL" == "true" ]]; then - python -m pip install --verbose -Csetup-args=-Dlink_mkl=true . - else - python -m pip install --verbose . - fi - - name: Test - run: | - pytest - rm -rf build/ - - # from here to end it's a copy-paste, with few changes, of - # https://github.com/pypa/cibuildwheel/blob/main/examples/github-deploy.yml - - build_wheels: - name: Build wheels on ${{ matrix.os }} - runs-on: ${{ matrix.os }} - strategy: - matrix: - # macos-13 is an intel runner, macos-14 is apple silicon - os: [ubuntu-latest, macos-14, windows-latest, macos-13] - - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - - name: Set up QEMU for aarch64 compilation on Linux - if: runner.os == 'Linux' - uses: docker/setup-qemu-action@v3 - with: - platforms: all - - - name: Install conda on Windows - if: runner.os == 'Windows' - uses: conda-incubator/setup-miniconda@v3 - with: - miniconda-version: "latest" - channels: conda-forge, anaconda - - - name: Install openblas from conda on Windows - if: runner.os == 'Windows' - run: conda install -y openblas pkgconfig - - - name: Build wheels - uses: pypa/cibuildwheel@v2.23.2 - - - uses: actions/upload-artifact@v4 - with: - name: cibw-wheels-${{ matrix.os }}-${{ strategy.job-index }} - path: ./wheelhouse/*.whl - - build_sdist: - name: Build source distribution - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - with: - submodules: true - - - name: Build sdist - run: pipx run build --sdist -Csetup-args=-Dsdist_mode=true - - - uses: actions/upload-artifact@v4 - with: - name: cibw-sdist - path: dist/*.tar.gz - - upload_pypi: - needs: [build_wheels, build_sdist] - runs-on: ubuntu-latest - environment: pypi - permissions: - id-token: write - # We can also upload always, with skip-existing: true, below - # We upload on every push event (only master, above) that is a new tag - if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags') - # Only run this step on GH release - # if: github.event_name == 'release' && github.event.action == 'published' - steps: - - uses: actions/download-artifact@v4 - with: - # unpacks all CIBW artifacts into dist/ - pattern: cibw-* - path: dist - merge-multiple: true - - - uses: pypa/gh-action-pypi-publish@release/v1 - with: - skip-existing: false # Fail loudly for duplicates. - # To test: - # with: - # repository-url: https://test.pypi.org/legacy/ diff --git a/.github/workflows/freethreading_tests.yml b/.github/workflows/freethreading_tests.yml new file mode 100644 index 00000000..50a6813c --- /dev/null +++ b/.github/workflows/freethreading_tests.yml @@ -0,0 +1,33 @@ +name: Free Threading Tests +on: [push, pull_request] +jobs: + tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + - name: Grab pythoncapi-compat + run: git clone --depth 1 https://github.com/python/pythoncapi-compat scs/pythoncapi-compat + - name: Install uv and set the python version + uses: astral-sh/setup-uv@v6 + with: + python-version: 3.13t + activate-environment: true + enable-cache: false + - name: Set up Python + run: uv python install + - name: Install OpenBLAS + run: | + sudo apt update + sudo apt install -y libopenblas-dev + - name: Install Dependencies + run: | + uv pip install numpy scipy pytest-timeout pytest-durations pytest-run-parallel + - name: Build + run: | + uv pip install -v . + - name: Run Tests + run: | + set -xe + uv run --no-project python -m pytest --parallel-threads=4 --iterations=1 -v -s --timeout=600 --durations=10 -m "not thread_unsafe" diff --git a/pyproject.toml b/pyproject.toml index 8f2947cd..602fb6c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,9 @@ version = "3.2.7" description = 'Splitting conic solver' readme = 'README.md' requires-python = '>=3.7' +classifiers = [ + 'Programming Language :: Python :: Free Threading :: 1 - Unstable', +] license = {file = 'LICENSE'} authors = [ {name = "Brendan O'Donoghue", email = "bodonoghue85@gmail.com"}] @@ -80,3 +83,8 @@ inherit.before-all = "append" before-all = [ # "apk update", "apk search -v '*blas*'", "apk add openblas-dev"] + +[tool.pytest.ini_options] +testpaths = [ + "test", +] diff --git a/scs/scsmodule.h b/scs/scsmodule.h index 1e05ffca..f34014ec 100644 --- a/scs/scsmodule.h +++ b/scs/scsmodule.h @@ -54,6 +54,10 @@ static PyObject *moduleinit(void) { #endif #endif +#ifdef Py_GIL_DISABLED + PyUnstable_Module_SetGIL(m, Py_MOD_GIL_NOT_USED); +#endif + if (m == NULL) { return NULL; } diff --git a/scs/scsobject.h b/scs/scsobject.h index da2de2ab..e2340ba2 100644 --- a/scs/scsobject.h +++ b/scs/scsobject.h @@ -2,10 +2,15 @@ #define PY_SCSOBJECT_H /* SCS Object type */ +#include "pythoncapi-compat/pythoncapi_compat.h" typedef struct { - PyObject_HEAD ScsWork *work; /* Workspace */ + PyObject_HEAD + ScsWork *work; /* Workspace */ ScsSolution *sol; /* Solution, keep around for warm-starts */ scs_int m, n; +#ifdef Py_GIL_DISABLED + PyMutex lock; +#endif } SCS; /* Just a helper struct to store the PyArrayObjects that need Py_DECREF */ @@ -563,10 +568,16 @@ static int SCS_init(SCS *self, PyObject *args, PyObject *kwargs) { self->sol->s = (scs_float *)scs_calloc(self->m, sizeof(scs_float)); /* release the GIL */ +#ifdef Py_GIL_DISABLED + PyMutex_Lock(&self->lock); +#endif Py_BEGIN_ALLOW_THREADS; self->work = scs_init(d, k, stgs); /* reacquire the GIL */ Py_END_ALLOW_THREADS; +#ifdef Py_GIL_DISABLED + PyMutex_Unlock(&self->lock); +#endif /* no longer need pointers to arrays that held primitives */ free_py_scs_data(d, k, stgs, &ps); @@ -582,6 +593,7 @@ static PyObject *SCS_solve(SCS *self, PyObject *args) { ScsSolution *sol = self->sol; npy_intp veclen[1]; int scs_float_type = scs_get_float_type(); + int errind = -1; if (!self->work) { return none_with_error("Workspace not initialized!"); @@ -606,17 +618,26 @@ static PyObject *SCS_solve(SCS *self, PyObject *args) { if (_warm_start) { /* If any of these of missing, we use the values in sol */ if ((void *)warm_x != Py_None) { - if (get_warm_start(self->sol->x, self->n, warm_x) < 0) { + Py_BEGIN_CRITICAL_SECTION(self); + errind = get_warm_start(self->sol->x, self->n, warm_x); + Py_END_CRITICAL_SECTION(); + if (errind < 0) { return none_with_error("Unable to parse x warm-start"); } } if ((void *)warm_y != Py_None) { - if (get_warm_start(self->sol->y, self->m, warm_y) < 0) { + Py_BEGIN_CRITICAL_SECTION(self); + errind = get_warm_start(self->sol->y, self->m, warm_y); + Py_END_CRITICAL_SECTION(); + if (errind < 0) { return none_with_error("Unable to parse y warm-start"); } } if ((void *)warm_s != Py_None) { - if (get_warm_start(self->sol->s, self->m, warm_s) < 0) { + Py_BEGIN_CRITICAL_SECTION(self); + errind = get_warm_start(self->sol->s, self->m, warm_s); + Py_END_CRITICAL_SECTION(); + if (errind < 0) { return none_with_error("Unable to parse s warm-start"); } } @@ -627,11 +648,17 @@ static PyObject *SCS_solve(SCS *self, PyObject *args) { PyObject *x, *y, *s, *return_dict, *info_dict; scs_float *_x, *_y, *_s; /* release the GIL */ +#ifdef Py_GIL_DISABLED + PyMutex_Lock(&self->lock); +#endif Py_BEGIN_ALLOW_THREADS; /* Solve! */ scs_solve(self->work, sol, &info, _warm_start); /* reacquire the GIL */ Py_END_ALLOW_THREADS; +#ifdef Py_GIL_DISABLED + PyMutex_Unlock(&self->lock); +#endif veclen[0] = self->n; _x = scs_malloc(self->n * sizeof(scs_float)); @@ -752,10 +779,16 @@ PyObject *SCS_update(SCS *self, PyObject *args) { } /* release the GIL */ +#ifdef Py_GIL_DISABLED + PyMutex_Lock(&self->lock); +#endif Py_BEGIN_ALLOW_THREADS; scs_update(self->work, b, c); /* reacquire the GIL */ Py_END_ALLOW_THREADS; +#ifdef Py_GIL_DISABLED + PyMutex_Unlock(&self->lock); +#endif Py_DECREF(b_new); Py_DECREF(c_new); @@ -766,7 +799,9 @@ PyObject *SCS_update(SCS *self, PyObject *args) { /* Deallocate SCS object */ static scs_int SCS_finish(SCS *self) { if (self->work) { + Py_BEGIN_CRITICAL_SECTION(self); scs_finish(self->work); + Py_END_CRITICAL_SECTION(); } if (self->sol) { scs_free(self->sol->x); diff --git a/scs/scspy.c b/scs/scspy.c index 215134d2..51cc6602 100644 --- a/scs/scspy.c +++ b/scs/scspy.c @@ -17,6 +17,7 @@ #include "numpy/arrayobject.h" /* Numpy C API */ #include "scs.h" /* SCS API */ #include "scs_types.h" /* SCS primitive types */ +#include "pythoncapi-compat/pythoncapi_compat.h" /* The PyInt variable is a PyLong in Python3.x. */ #if PY_MAJOR_VERSION >= 3 diff --git a/test/test_scs_concurrent_solve.py b/test/test_scs_concurrent_solve.py new file mode 100644 index 00000000..0b550042 --- /dev/null +++ b/test/test_scs_concurrent_solve.py @@ -0,0 +1,283 @@ +import pytest +import threading +import numpy as np +import scipy.sparse as sp +import scs +import sys +from numpy.testing import assert_almost_equal +import time +import queue +from concurrent.futures import ThreadPoolExecutor, as_completed +import gen_random_cone_prob as tools + +# --- Global constant --- +FAIL = "failure" + +SHARED_DATA_FOR_TEST = { + "A": sp.csc_matrix([1.0, -1.0]).T.tocsc(), + "b": np.array([1.0, 0.0]), + "c": np.array([-1.0]) +} +SHARED_CONE_CONFIG_FOR_TEST = {"q": [], "l": 2} +EXPECTED_X0_FOR_SHARED_PROBLEM_TEST = 1.0 +NUM_CONCURRENT_SOLVES=8 + +# Cone definition +K_CONFIG = { + "z": 5, + "l": 10, + "q": [3, 4], + "s": [2, 3], + "ep": 4, + "ed": 4, + "p": [-0.25, 0.5], +} + +SOLVER_PARAMS_CONFIG = { + "verbose": False, + "eps_abs": 1e-5, + "eps_rel": 1e-5, + "eps_infeas": 1e-5, + "max_iters": 3500, +} + +UPDATE_TEST_C_NEW = np.array([1.0]) +UPDATE_TEST_B_NEW = np.array([1.0, 1.0]) + +EXPECTED_X1_UPDATE = 1.0 +EXPECTED_X2_UPDATE = 0.0 +EXPECTED_X3_UPDATE = -1.0 + +# --- Worker function executed by each thread --- +def solve_one_random_cone_problem(cone_def, solver_params_def, worker_id): + """ + Generates a random feasible cone problem, solves it with SCS, and performs assertions. + This function is intended to be run in a separate thread. + Returns True on success, raises AssertionError on failure. + """ + thread_name = threading.current_thread().name + print(f"[Worker {worker_id} on {thread_name}]") + + m_dims = tools.get_scs_cone_dims(cone_def) + n_vars = m_dims // 2 + if n_vars == 0: n_vars = 1 + + # Generate a new feasible problem for each worker + data, p_star_expected = tools.gen_feasible(cone_def, n=n_vars, density=0.2) + + print(f"[Worker {worker_id} on {thread_name}]: Problem generated. m={m_dims}, n={n_vars}. Expected p_star ~ {p_star_expected:.4f}") + + # Create and run the SCS solver + solver = scs.SCS(data, cone_def, use_indirect=False, gpu=False, **solver_params_def) + sol = solver.solve() + x_sol = sol["x"] + y_sol = sol["y"] + s_sol = sol["s"] + info = sol["info"] + + print(f"[Worker {worker_id} on {thread_name}]: Solved. Status: {info['status']}. Pobj: {info['pobj']:.4f}, Iters: {info['iter']}") + + # Assertions (similar to test_solve_feasible) + # 1. Objective value + np.testing.assert_almost_equal(np.dot(data["c"], x_sol), p_star_expected, decimal=2, + err_msg=f"Worker {worker_id}: Objective value mismatch.") + + # 2. Primal feasibility (Ax - b + s = 0 => ||Ax - b + s|| ~ 0) + # Relaxed tolerance from 1e-3 to 5e-3 + primal_residual_norm = np.linalg.norm(data["A"] @ x_sol - data["b"] + s_sol) + np.testing.assert_array_less(primal_residual_norm, 5e-3, + err_msg=f"Worker {worker_id}: Primal residual norm too high: {primal_residual_norm}") + + # 3. Dual feasibility (A'y + c = 0 => ||A'y + c|| ~ 0 for LP part, more complex for cones) + # Relaxed tolerance from 1e-3 to 5e-3 + dual_residual_norm = np.linalg.norm(data["A"].T @ y_sol + data["c"]) + np.testing.assert_array_less(dual_residual_norm, 5e-3, + err_msg=f"Worker {worker_id}: Dual residual norm too high: {dual_residual_norm}") + + # 4. Complementary slackness (s'y ~ 0) + complementarity = s_sol.T @ y_sol + np.testing.assert_almost_equal(complementarity, 0.0, decimal=3, # Check if close to zero + err_msg=f"Worker {worker_id}: Complementary slackness violation: {complementarity}") + + # 5. Slack variable s in primal cone K (s = proj_K(s)) + projected_s = tools.proj_cone(s_sol, cone_def) + np.testing.assert_almost_equal(s_sol, projected_s, decimal=3, + err_msg=f"Worker {worker_id}: Slack variable s not in primal cone.") + + # 6. Dual variable y in dual cone K* (y = proj_K*(y)) + projected_y_dual = tools.proj_dual_cone(y_sol, cone_def) + np.testing.assert_almost_equal(y_sol, projected_y_dual, decimal=3, + err_msg=f"Worker {worker_id}: Dual variable y not in dual cone.") + + print(f"[Worker {worker_id} on {thread_name}]: All assertions passed.") + return {"id": worker_id, "status": "success", "pobj": info['pobj'], "iters": info['iter']} + +# --- Pytest test function using ThreadPoolExecutor --- +pytest.mark.skipif(sys._is_gil_enabled(), "Only for free threaded") +def test_concurrent_independent_cone_solves(): + """ + Tests running multiple independent SCS solves concurrently using ThreadPoolExecutor. + Each solve uses the provided use_indirect and gpu flags. + """ + completed_solves = 0 + failed_solves_details = [] + + with ThreadPoolExecutor(max_workers=NUM_CONCURRENT_SOLVES) as executor: + futures = [] + for i in range(NUM_CONCURRENT_SOLVES): + worker_id = i + 1 + future = executor.submit( + solve_one_random_cone_problem, + K_CONFIG, + SOLVER_PARAMS_CONFIG, + worker_id + ) + futures.append(future) + print(f"pytest: Submitted task for worker {worker_id}.") + + print(f"\npytest: All {NUM_CONCURRENT_SOLVES} tasks submitted. Waiting for completion...\n") + + for future in as_completed(futures, timeout=NUM_CONCURRENT_SOLVES * 60.0): + # Determine worker_id based on the future object's position in the original list. + # This is a bit fragile if futures list were modified, but common for simple cases. + # A more robust way would be to wrap future with its ID if needed for complex scenarios. + worker_id_from_future = -1 # Default / placeholder + for idx, f_item in enumerate(futures): + if f_item == future: + worker_id_from_future = idx + 1 + break + + try: + result = future.result(timeout=60.0) + print(f"pytest: Worker {result.get('id', worker_id_from_future)} completed successfully: {result}") + completed_solves += 1 + except Exception as e: + error_detail = f"Worker {worker_id_from_future} failed: {type(e).__name__}: {e}" + print(f"pytest: ERROR - {error_detail}") + failed_solves_details.append(error_detail) + + print(f"\npytest: Test execution finished.") + print(f"Total solves attempted: {NUM_CONCURRENT_SOLVES}") + print(f"Successful solves: {completed_solves}") + print(f"Failed solves: {len(failed_solves_details)}") + + if failed_solves_details: + pytest.fail(f"{len(failed_solves_details)} out of {NUM_CONCURRENT_SOLVES} concurrent solves failed.\n" + f"Failures:\n" + "\n".join(failed_solves_details)) + + assert completed_solves == NUM_CONCURRENT_SOLVES, \ + f"Expected {NUM_CONCURRENT_SOLVES} successful concurrent solves, but got {completed_solves}." + + print(f"pytest: All {NUM_CONCURRENT_SOLVES} concurrent solves passed.") + +def worker_perform_solve_update_sequence(solver_params_def, worker_id): + """ + Performs a sequence of solve and update operations on an SCS instance. + """ + thread_name = threading.current_thread().name + print(f"[Worker {worker_id} (UpdateSeq) on {thread_name}]: Starting") + + solver = scs.SCS(SHARED_DATA_FOR_TEST, SHARED_CONE_CONFIG_FOR_TEST, + use_indirect=False, gpu=False, **solver_params_def) + + print(f"[Worker {worker_id} (UpdateSeq) on {thread_name}]: Performing initial solve.") + sol1 = solver.solve() + np.testing.assert_almost_equal(sol1["x"][0], EXPECTED_X1_UPDATE, decimal=2, + err_msg=f"Worker {worker_id} (UpdateSeq): Initial solve failed.") + print(f"[Worker {worker_id} (UpdateSeq) on {thread_name}]: Initial solve OK, x={sol1['x'][0]:.2f}") + + print(f"[Worker {worker_id} (UpdateSeq) on {thread_name}]: Updating c and solving.") + solver.update(c=UPDATE_TEST_C_NEW) + sol2 = solver.solve() + np.testing.assert_almost_equal(sol2["x"][0], EXPECTED_X2_UPDATE, decimal=2, + err_msg=f"Worker {worker_id} (UpdateSeq): Solve after c update failed.") + print(f"[Worker {worker_id} (UpdateSeq) on {thread_name}]: Solve after c update OK, x={sol2['x'][0]:.2f}") + + print(f"[Worker {worker_id} (UpdateSeq) on {thread_name}]: Updating b and solving.") + solver.update(b=UPDATE_TEST_B_NEW) + sol3 = solver.solve() + np.testing.assert_almost_equal(sol3["x"][0], EXPECTED_X3_UPDATE, decimal=2, + err_msg=f"Worker {worker_id} (UpdateSeq): Solve after b update failed.") + print(f"[Worker {worker_id} (UpdateSeq) on {thread_name}]: Solve after b update OK, x={sol3['x'][0]:.2f}") + + print(f"[Worker {worker_id} (UpdateSeq) on {thread_name}]: All update sequence assertions passed.") + return {"id": worker_id, "type": "UpdateSeq", "status": "success"} + + +# --- Test for Concurrent Solve and Update Sequences --- +pytest.mark.skipif(sys._is_gil_enabled(), "Only for free threaded") +def test_concurrent_solve_update_sequences(): + """ + Tests running multiple SCS solve-update-solve sequences concurrently. + """ + print(f"\npytest: Starting concurrent solve-update sequences test (use_indirect=False, gpu=False)") + + completed_jobs = 0 + failed_jobs_details = [] + + with ThreadPoolExecutor(max_workers=NUM_CONCURRENT_SOLVES) as executor: + futures = [] + for i in range(NUM_CONCURRENT_SOLVES): + worker_id = i + 1 + future = executor.submit( + worker_perform_solve_update_sequence, + SOLVER_PARAMS_CONFIG, worker_id + ) + futures.append(future) + print(f"pytest: Submitted task for UpdateSeq worker {worker_id}.") + + print(f"\npytest: All {NUM_CONCURRENT_SOLVES} UpdateSeq tasks submitted. Waiting for completion...\n") + for future in as_completed(futures, timeout=NUM_CONCURRENT_SOLVES * 30.0): + worker_id_from_future = futures.index(future) + 1 + try: + result = future.result(timeout=30.0) + print(f"pytest: UpdateSeq Worker {result.get('id', worker_id_from_future)} completed successfully: {result}") + completed_jobs += 1 + except Exception as e: + error_detail = f"UpdateSeq Worker {worker_id_from_future} failed: {type(e).__name__}: {e}" + print(f"pytest: ERROR - {error_detail}") + failed_jobs_details.append(error_detail) + + print(f"\npytest: UpdateSeq test execution finished.") + print(f"Total UpdateSeq jobs attempted: {NUM_CONCURRENT_SOLVES}, Successful: {completed_jobs}, Failed: {len(failed_jobs_details)}") + + if failed_jobs_details: + pytest.fail(f"{len(failed_jobs_details)} out of {NUM_CONCURRENT_SOLVES} concurrent UpdateSeq jobs failed.\nFailures:\n" + "\n".join(failed_jobs_details)) + assert completed_jobs == NUM_CONCURRENT_SOLVES, f"Expected {NUM_CONCURRENT_SOLVES} successful UpdateSeq jobs, got {completed_jobs}." + print(f"pytest: All {NUM_CONCURRENT_SOLVES} concurrent UpdateSeq jobs passed.") + +# --- Worker function for threads --- +def worker_solve_on_shared_instance(test_id, shared_solver_instance, expected_x0, results_queue): + """ + Attempts to call solve() on a shared SCS solver instance. + Reports result or exception to the main thread via a queue. + """ + print(f"[Thread {test_id}]: Attempting to call solve() on the shared solver instance.") + try: + sol = shared_solver_instance.solve(warm_start=False, x=None, y=None, s=None) + if sol["info"]["status"] != "solved": + # Report failure status + results_queue.put({ + "id": test_id, + "status": "solver_fail_status", + "info": sol["info"], + "x": sol.get("x") + }) + print(f"[Thread {test_id}]: Solver status: {sol['info']['status']}.") + return + + assert_almost_equal(sol["x"][0], expected_x0, decimal=2) + results_queue.put({ + "id": test_id, + "status": "success", + "x0": sol["x"][0], + "info": sol["info"] + }) + print(f"[Thread {test_id}]: Call to solve() completed. Expected x[0] ~ {expected_x0}, Got x[0] ~ {sol['x'][0]:.2f}.") + + except AssertionError as e: + results_queue.put({"id": test_id, "status": "assertion_error", "error": e}) + print(f"[Thread {test_id}]: TEST FAILED (result inconsistent). Assertion Error: {e}") + except Exception as e: + results_queue.put({"id": test_id, "status": "exception", "error": e, "type": type(e).__name__}) + print(f"[Thread {test_id}]: An unexpected error occurred: {type(e).__name__}: {e}.") diff --git a/test/test_scs_ft.py b/test/test_scs_ft.py new file mode 100644 index 00000000..fad5a768 --- /dev/null +++ b/test/test_scs_ft.py @@ -0,0 +1,158 @@ +import pytest +import threading +import numpy as np +import scipy.sparse as sp +import scs +import sys +from numpy.testing import assert_almost_equal +import time +import queue + +# --- Global constant --- +FAIL = "failure" + +SHARED_DATA_FOR_TEST = { + "A": sp.csc_matrix([1.0, -1.0]).T.tocsc(), + "b": np.array([1.0, 0.0]), + "c": np.array([-1.0]) +} +SHARED_CONE_CONFIG_FOR_TEST = {"q": [], "l": 2} +EXPECTED_X0_FOR_SHARED_PROBLEM_TEST = 1.0 + +# --- Worker function for threads --- +def worker_solve_on_shared_instance(test_id, shared_solver_instance, expected_x0, results_queue): + """ + Attempts to call solve() on a shared SCS solver instance. + Reports result or exception to the main thread via a queue. + """ + print(f"[Thread {test_id}]: Attempting to call solve() on the shared solver instance.") + try: + sol = shared_solver_instance.solve(warm_start=False, x=None, y=None, s=None) + if sol["info"]["status"] != "solved": + # Report failure status + results_queue.put({ + "id": test_id, + "status": "solver_fail_status", + "info": sol["info"], + "x": sol.get("x") + }) + print(f"[Thread {test_id}]: Solver status: {sol['info']['status']}.") + return + + assert_almost_equal(sol["x"][0], expected_x0, decimal=2) + results_queue.put({ + "id": test_id, + "status": "success", + "x0": sol["x"][0], + "info": sol["info"] + }) + print(f"[Thread {test_id}]: Call to solve() completed. Expected x[0] ~ {expected_x0}, Got x[0] ~ {sol['x'][0]:.2f}.") + + except AssertionError as e: + results_queue.put({"id": test_id, "status": "assertion_error", "error": e}) + print(f"[Thread {test_id}]: TEST FAILED (result inconsistent). Assertion Error: {e}") + except Exception as e: + results_queue.put({"id": test_id, "status": "exception", "error": e, "type": type(e).__name__}) + print(f"[Thread {test_id}]: An unexpected error occurred: {type(e).__name__}: {e}.") + +pytest.mark.skipif(sys._is_gil_enabled(), "Only for free threaded") +def test_concurrent_solve_on_single_scs_instance(): + """ + Tests concurrent calls to solve() on a SINGLE scs.SCS instance. + """ + print("\npytest: Starting test: Concurrent calls to solve() on a SINGLE SCS instance.") + print("pytest: WARNING: This test probes potentially unsafe behavior.\n") + + # ONE SCS solver instance that will be shared among threads + print(f"pytest: Creating a single shared SCS solver instance with data and cone={SHARED_CONE_CONFIG_FOR_TEST}") + try: + shared_solver = scs.SCS( + SHARED_DATA_FOR_TEST, + cone=SHARED_CONE_CONFIG_FOR_TEST, + verbose=False, + normalize=False, + max_iters=2000 + ) + print("pytest: Shared SCS solver instance created successfully.") + except Exception as e: + pytest.fail(f"pytest: Failed to create the shared SCS solver instance: {type(e).__name__}: {e}") + + + num_concurrent_calls = 4 + threads = [] + results_queue = queue.Queue() + + print(f"\npytest: Launching {num_concurrent_calls} threads to call solve() on the shared instance...\n") + + for i in range(num_concurrent_calls): + test_id = i + 1 + thread = threading.Thread( + target=worker_solve_on_shared_instance, + args=(test_id, shared_solver, EXPECTED_X0_FOR_SHARED_PROBLEM_TEST, results_queue) + ) + threads.append(thread) + thread.start() + print(f"pytest: Launched thread {test_id} to call solve() on shared instance.") + + print("\npytest: All threads launched. Waiting for completion (timeout 10s per thread)...\n") + + for i, thread in enumerate(threads): + thread.join(timeout=10.0) + if thread.is_alive(): + print(f"pytest: WARNING - Thread {i+1} is still alive after join timeout. Test may hang or be inconclusive for this thread.") + else: + print(f"pytest: Thread {i+1} has finished.") + + print("\npytest: All threads have attempted to call solve() on the shared instance.") + + success_count = 0 + solver_fail_status_count = 0 + assertion_error_count = 0 + exception_count = 0 + + results_summary = [] + + while not results_queue.empty(): + try: + result = results_queue.get_nowait() + results_summary.append(result) + if result["status"] == "success": + success_count += 1 + elif result["status"] == "solver_fail_status": + solver_fail_status_count += 1 + elif result["status"] == "assertion_error": + assertion_error_count += 1 + elif result["status"] == "exception": + exception_count += 1 + except queue.Empty: + break + except Exception as e: + print(f"pytest: Error retrieving result from queue: {e}") + + print("\n--- Results Summary ---") + for res_idx, res_item in enumerate(results_summary): + print(f"Result {res_idx + 1}: {res_item}") + print("-----------------------") + print(f"Total threads launched: {num_concurrent_calls}") + print(f"Threads reported results: {len(results_summary)}") + print(f"Successful solves (matching expected): {success_count}") + print(f"Solver reported non-success status: {solver_fail_status_count}") + print(f"Assertion errors (result mismatch): {assertion_error_count}") + print(f"Other Python exceptions during solve: {exception_count}") + + if exception_count > 0: + exception_details = [res for res in results_summary if res["status"] == "exception"] + pytest.fail(f"{exception_count} thread(s) raised an unexpected Python exception during solve(). Details: {exception_details}") + + if assertion_error_count > 0: + assertion_details = [res for res in results_summary if res["status"] == "assertion_error"] + pytest.fail(f"{assertion_error_count} thread(s) had an assertion error (result mismatch). This indicates inconsistency. Details: {assertion_details}") + + if solver_fail_status_count > 0: + fail_status_details = [res for res in results_summary if res["status"] == "solver_fail_status"] + pytest.fail(f"{solver_fail_status_count} thread(s) resulted in a non-success solver status. Details: {fail_status_details}") + + + assert success_count == num_concurrent_calls, \ + f"Expected all {num_concurrent_calls} threads to succeed and match expected value, but only {success_count} did. " \ + f"Solver fails: {solver_fail_status_count}, Assertion errors: {assertion_error_count}, Exceptions: {exception_count}" diff --git a/test/test_solve_random_cone_prob.py b/test/test_solve_random_cone_prob.py index 68b3420d..650adbf1 100644 --- a/test/test_solve_random_cone_prob.py +++ b/test/test_solve_random_cone_prob.py @@ -62,7 +62,7 @@ def test_solve_feasible(use_indirect, gpu): np.linalg.norm(data["A"].T @ y + data["c"]), 1e-3 ) np.testing.assert_almost_equal(s.T @ y, 0.0) - np.testing.assert_almost_equal(s, tools.proj_cone(s, K), decimal=4) + np.testing.assert_almost_equal(s, tools.proj_cone(s, K), decimal=3) np.testing.assert_almost_equal(y, tools.proj_dual_cone(y, K), decimal=3)