From 34029f756f8141bfcc2cd8ceb978fc1016a8b617 Mon Sep 17 00:00:00 2001 From: ilyabasharov Date: Fri, 19 Apr 2024 13:50:33 +0300 Subject: [PATCH 1/7] [Add] docker, fix install --- .dockerignore | 0 .pre-commit-config.yaml | 31 + Dockerfile.cuda | 272 +++++++ LICENSE | 661 ++++++++++++++++++ docker-compose.yml | 25 + pyproject.toml | 112 +++ requirements.txt | 5 - setup.py | 23 - stream_tools/__init__.py | 1 + stream_tools/action_tracker/__init__.py | 2 +- .../action_tracker/movement_tracker.py | 38 +- stream_tools/config/__init__.py | 2 +- stream_tools/config/base_config.py | 9 +- stream_tools/config/ivi_config.py | 16 +- stream_tools/dataloader/__init__.py | 2 +- stream_tools/dataloader/base.py | 97 +-- stream_tools/dataloader/ivideon_loader.py | 153 ++-- stream_tools/dataloader/opencv_loader.py | 40 +- stream_tools/model/__init__.py | 2 +- stream_tools/model/classifier.py | 29 +- stream_tools/model/detector.py | 29 +- stream_tools/pipeline/__init__.py | 2 +- stream_tools/pipeline/mutlitrack.py | 154 ++-- stream_tools/pipeline/pipeline_base.py | 57 +- 24 files changed, 1354 insertions(+), 408 deletions(-) create mode 100644 .dockerignore create mode 100644 .pre-commit-config.yaml create mode 100644 Dockerfile.cuda create mode 100644 LICENSE create mode 100644 docker-compose.yml create mode 100644 pyproject.toml delete mode 100644 requirements.txt delete mode 100644 setup.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..e69de29 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..b14540d --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,31 @@ +repos: + - repo: https://github.com/PyCQA/flake8.git + rev: 7.0.0 + hooks: + - id: flake8 + - repo: https://github.com/PyCQA/isort.git + rev: 5.13.2 + hooks: + - id: isort + - repo: https://github.com/pre-commit/mirrors-yapf.git + rev: v0.32.0 + hooks: + - id: yapf + additional_dependencies: [toml] + - repo: https://github.com/pre-commit/pre-commit-hooks.git + rev: v4.6.0 + hooks: + - id: trailing-whitespace + - id: check-yaml + - id: end-of-file-fixer + - id: requirements-txt-fixer + - id: double-quote-string-fixer + - id: check-merge-conflict + - id: fix-encoding-pragma + args: ["--remove"] + - id: mixed-line-ending + args: ["--fix=lf"] + - repo: https://github.com/codespell-project/codespell.git + rev: v2.2.6 + hooks: + - id: codespell diff --git a/Dockerfile.cuda b/Dockerfile.cuda new file mode 100644 index 0000000..4e46f98 --- /dev/null +++ b/Dockerfile.cuda @@ -0,0 +1,272 @@ +# syntax=docker/dockerfile:1 + +# Does not work because of RTSP connection +# FROM nvcr.io/nvidia/pytorch:23.09-py3 AS base + +# Variables used at build time. +## Base CUDA version. See all supported version at https://hub.docker.com/r/nvidia/cuda/tags?page=2&name=-devel-ubuntu +ARG CUDA_VERSION=11.8.0 +## Base Ubuntu version. +ARG OS_VERSION=22.04 + +# Define base image. +FROM nvidia/cuda:${CUDA_VERSION}-cudnn8-devel-ubuntu${OS_VERSION} AS base + +# Dublicate args because of the visibility zone +# https://docs.docker.com/engine/reference/builder/#understand-how-arg-and-from-interact +ARG CUDA_VERSION +ARG OS_VERSION + +## Base TensorRT version. +ARG TRT_VERSION=8.6.1.6 +## Base PyTorch version. +ARG TORCH_VERSION=2.2.0 +## Base TorchVision version. +ARG TORCHVISION_VERSION=0.17.0 +## Base OpenCV version. +ARG OPENCV_VERSION=4.8.0.74 +## Base CMake version. +ARG CMAKE_VERSION=3.26.0 +## Base Timezone +ARG TZ=Europe/Moscow + +# Set environment variables. +## Set non-interactive to prevent asking for user inputs blocking image creation. +ENV DEBIAN_FRONTEND=noninteractive \ + ## Set timezone as it is required by some packages. + TZ=${TZ} \ + ## CUDA Home, required to find CUDA in some packages. + CUDA_HOME="/usr/local/cuda" \ + ## Set LD_LIBRARY_PATH for local libs (glog etc.) + LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/usr/local/lib" \ + ## Accelerate compilation flags (use all cores) + MAKEFLAGS=-j$(nproc) \ + ## Torch GPU arch list + TORCH_CUDA_ARCH_LIST="6.0 6.1 7.0 7.5 8.0 8.6 8.9" + +RUN apt-get update && \ + apt-get install \ + --no-install-recommends \ + --yes \ + # Basics + gcc \ + g++ \ + yasm \ + unzip \ + git \ + wget \ + libx264-dev \ + libgnutls28-dev \ + python3 \ + python3-pip \ + python3-dev \ + python3-setuptools \ + # Requirements from ultralytics + libgl1 \ + libglib2.0-0 \ + gnupg \ + libusb-1.0-0 \ + # Linux security updates + # https://security.snyk.io/vuln/SNYK-UBUNTU1804-OPENSSL-3314796 + openssl \ + tar \ + # Image I/O libs + libjpeg-dev \ + libpng-dev \ + libtiff-dev \ + # Parallelism library C++ for CPU + libtbb-dev \ + # Optimization libraries for OpenCV + libatlas-base-dev \ + gfortran \ + # Video/Audio Libs - FFMPEG, GSTREAMER, x264 and so on. + ## AV Lib [does not work with tensor_stream] + # libavcodec-dev \ + # libavformat-dev \ + # libswscale-dev \ + ## Gstreamer + libgstreamer1.0-dev \ + libgstreamer-plugins-base1.0-dev && \ + ## Clean cached files + ln -s /usr/bin/python3 /usr/bin/python && \ + apt-get clean --yes && \ + apt-get autoremove --yes && \ + rm -rf /var/lib/apt/lists/* && \ + rm -rf /var/cache/apt/archives/* && \ + ## Set timezone + ln -snf /usr/share/zoneinfo/${TZ} /etc/localtime && \ + echo ${TZ} > /etc/timezone + +SHELL ["/bin/bash", "-c"] + +# Install CMake +RUN wget https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-Linux-x86_64.sh \ + -q -O /tmp/cmake-install.sh \ + && chmod u+x /tmp/cmake-install.sh \ + && mkdir /usr/bin/cmake \ + && /tmp/cmake-install.sh --skip-license --prefix=/usr/bin/cmake \ + && rm /tmp/cmake-install.sh + +ENV PATH="/usr/bin/cmake/bin:${PATH}" + +# Install TensorRT +## Now only supported for Ubuntu 22.04 +## Cannot install via pip because cuda-based errors +RUN v="${TRT_VERSION}-1+cuda${CUDA_VERSION%.*}" distro="ubuntu${OS_VERSION//./}" arch=$(uname -m) && \ + wget https://developer.download.nvidia.com/compute/cuda/repos/${distro}/${arch}/cuda-archive-keyring.gpg && \ + mv cuda-archive-keyring.gpg /usr/share/keyrings/cuda-archive-keyring.gpg && \ + echo "deb [signed-by=/usr/share/keyrings/cuda-archive-keyring.gpg] https://developer.download.nvidia.com/compute/cuda/repos/${distro}/${arch}/ /" | \ + tee /etc/apt/sources.list.d/cuda-${distro}-${arch}.list && \ + apt-get update && \ + apt-get install \ + libnvinfer-headers-dev=${v} \ + libnvinfer-dispatch8=${v} \ + libnvinfer-lean8=${v} \ + libnvinfer-dev=${v} \ + libnvinfer-headers-plugin-dev=${v} \ + libnvinfer-lean-dev=${v} \ + libnvinfer-dispatch-dev=${v} \ + libnvinfer-plugin-dev=${v} \ + libnvinfer-vc-plugin-dev=${v} \ + libnvparsers-dev=${v} \ + libnvonnxparsers-dev=${v} \ + libnvinfer8=${v} \ + libnvinfer-plugin8=${v} \ + libnvinfer-vc-plugin8=${v} \ + libnvparsers8=${v} \ + libnvonnxparsers8=${v} && \ + apt-get install \ + python3-libnvinfer=${v} \ + tensorrt-dev=${v} && \ + apt-mark hold tensorrt-dev + +# Build nvidia codec headers +RUN git clone -b sdk/11.1 --single-branch https://git.videolan.org/git/ffmpeg/nv-codec-headers.git && \ + cd nv-codec-headers && make install && \ + cd .. && rm -rf nv-codec-headers + +# Build ffmpeg with nvenc support +RUN git clone --depth 1 -b release/6.0 --single-branch https://github.com/FFmpeg/FFmpeg.git && \ + cd FFmpeg && \ + mkdir ffmpeg_build && cd ffmpeg_build && \ + ../configure \ + --enable-cuda \ + --enable-shared \ + --enable-decoder=aac \ + --enable-decoder=h264 \ + --enable-decoder=h264_cuvid \ + --enable-decoder=rawvideo \ + --enable-indev=lavfi \ + --enable-encoder=libx264 \ + --enable-encoder=h264_nvenc \ + --enable-demuxer=mov \ + --enable-muxer=mp4 \ + --enable-filter=scale \ + --enable-filter=testsrc2 \ + --enable-protocol=file \ + --enable-protocol=https \ + --enable-network \ + --enable-protocol=tcp \ + --enable-protocol=udp \ + --enable-protocol=rtp \ + --enable-demuxer=rtsp \ + --enable-gnutls \ + --enable-shared \ + --enable-gpl \ + --enable-nonfree \ + --enable-libx264 \ + --enable-nvenc \ + --enable-cuvid \ + --enable-nvdec \ + --disable-static \ + --disable-doc \ + --extra-cflags='-I/usr/local/cuda/include' \ + --extra-ldflags='-L/usr/local/cuda/lib64' \ + --extra-libs=-lpthread \ + --nvccflags="-arch=sm_60 \ + -gencode=arch=compute_60,code=sm_60 \ + -gencode=arch=compute_61,code=sm_61 \ + -gencode=arch=compute_70,code=sm_70 \ + -gencode=arch=compute_75,code=sm_75 \ + -gencode=arch=compute_80,code=sm_80 \ + -gencode=arch=compute_86,code=sm_86 \ + -gencode=arch=compute_89,code=sm_89 \ + -gencode=arch=compute_89,code=compute_89" && \ + make clean && make -j$(nproc) && make install && ldconfig && \ + cd ../.. && rm -rf FFmpeg + +# Install torch +RUN python3 -m pip install \ + --upgrade \ + --no-cache \ + pip \ + wheel \ + setuptools \ + packaging \ + ninja && \ + ## Install pytorch and submodules + CUDA_VER=${CUDA_VERSION%.*} && CUDA_VER=${CUDA_VER//./} && \ + python3 -m pip install \ + --no-cache \ + torch==${TORCH_VERSION} \ + torchvision==${TORCHVISION_VERSION} \ + torchaudio==${TORCH_VERSION} \ + --index-url https://download.pytorch.org/whl/cu${CUDA_VER} + +# Create working directory +WORKDIR /usr/src/ + +# Install OpenCV with CUDA +RUN python3 -m pip uninstall \ + --yes \ + opencv-contrib-python \ + opencv-python-headless \ + opencv-python && \ + ln -s /usr/include/x86_64-linux-gnu/cudnn_version_v8.h /usr/include/x86_64-linux-gnu/cudnn_version.h && \ + git clone --depth 1 -b ${OPENCV_VERSION%.*} https://github.com/opencv/opencv.git /usr/src/opencv && \ + git clone --depth 1 -b ${OPENCV_VERSION%.*} https://github.com/opencv/opencv_contrib.git /usr/src/opencv_contrib && \ + cd /usr/src/opencv && \ + mkdir build && \ + cd build && \ + cmake \ + -D CPACK_BINARY_DEB=ON \ + -D BUILD_EXAMPLES=OFF \ + -D INSTALL_C_EXAMPLES=OFF \ + -D BUILD_opencv_cudacodec=ON \ + -D BUILD_opencv_python2=OFF \ + -D BUILD_opencv_python3=ON \ + -D BUILD_opencv_java=OFF \ + -D CMAKE_BUILD_TYPE=RELEASE \ + -D CMAKE_INSTALL_PREFIX=/usr/local \ + -D CUDA_ARCH_BIN=6.0,6.1,7.0,7.5,8.0,8.6,8.9 \ + -D CUDA_ARCH_PTX= \ + -D ENABLE_FAST_MATH=ON \ + -D CUDA_FAST_MATH=ON \ + -D CUDNN_INCLUDE_DIR=/usr/include/x86_64-linux-gnu \ + -D EIGEN_INCLUDE_PATH=/usr/include/eigen3 \ + -D WITH_EIGEN=ON \ + -D ENABLE_NEON=OFF \ + -D OPENCV_DNN_CUDA=ON \ + -D OPENCV_ENABLE_NONFREE=ON \ + -D OPENCV_EXTRA_MODULES_PATH=/usr/src/opencv_contrib/modules \ + -D OPENCV_GENERATE_PKGCONFIG=ON \ + -D WITH_CUBLAS=ON \ + -D WITH_CUDA=ON \ + -D WITH_CUDNN=ON \ + -D WITH_GSTREAMER=ON \ + -D WITH_LIBV4L=ON \ + -D WITH_OPENGL=ON \ + -D WITH_OPENCL=OFF \ + -D WITH_IPP=OFF \ + -D WITH_TBB=ON \ + -D WITH_TIFF=ON \ + -D WITH_JPEG=ON \ + -D WITH_PNG=ON \ + -D BUILD_PERF_TESTS=OFF \ + -D BUILD_TESTS=OFF \ + -D WITH_QT=OFF \ + -D BUILD_DOCS=OFF \ + .. && \ + make -j$(nproc) && \ + make install && \ + ldconfig diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..be3f7b2 --- /dev/null +++ b/LICENSE @@ -0,0 +1,661 @@ + GNU AFFERO GENERAL PUBLIC LICENSE + Version 3, 19 November 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The GNU Affero General Public License is a free, copyleft license for +software and other kinds of works, specifically designed to ensure +cooperation with the community in the case of network server software. + + The licenses for most software and other practical works are designed +to take away your freedom to share and change the works. By contrast, +our General Public Licenses are intended to guarantee your freedom to +share and change all versions of a program--to make sure it remains free +software for all its users. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +them if you wish), that you receive source code or can get it if you +want it, that you can change the software or use pieces of it in new +free programs, and that you know you can do these things. + + Developers that use our General Public Licenses protect your rights +with two steps: (1) assert copyright on the software, and (2) offer +you this License which gives you legal permission to copy, distribute +and/or modify the software. + + A secondary benefit of defending all users' freedom is that +improvements made in alternate versions of the program, if they +receive widespread use, become available for other developers to +incorporate. Many developers of free software are heartened and +encouraged by the resulting cooperation. However, in the case of +software used on network servers, this result may fail to come about. +The GNU General Public License permits making a modified version and +letting the public access it on a server without ever releasing its +source code to the public. + + The GNU Affero General Public License is designed specifically to +ensure that, in such cases, the modified source code becomes available +to the community. It requires the operator of a network server to +provide the source code of the modified version running there to the +users of that server. Therefore, public use of a modified version, on +a publicly accessible server, gives the public access to the source +code of the modified version. + + An older license, called the Affero General Public License and +published by Affero, was designed to accomplish similar goals. This is +a different license, not a version of the Affero GPL, but Affero has +released a new version of the Affero GPL which permits relicensing under +this license. + + The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + + 0. Definitions. + + "This License" refers to version 3 of the GNU Affero General Public License. + + "Copyright" also means copyright-like laws that apply to other kinds of +works, such as semiconductor masks. + + "The Program" refers to any copyrightable work licensed under this +License. Each licensee is addressed as "you". "Licensees" and +"recipients" may be individuals or organizations. + + To "modify" a work means to copy from or adapt all or part of the work +in a fashion requiring copyright permission, other than the making of an +exact copy. The resulting work is called a "modified version" of the +earlier work or a work "based on" the earlier work. + + A "covered work" means either the unmodified Program or a work based +on the Program. + + To "propagate" a work means to do anything with it that, without +permission, would make you directly or secondarily liable for +infringement under applicable copyright law, except executing it on a +computer or modifying a private copy. Propagation includes copying, +distribution (with or without modification), making available to the +public, and in some countries other activities as well. + + To "convey" a work means any kind of propagation that enables other +parties to make or receive copies. Mere interaction with a user through +a computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays "Appropriate Legal Notices" +to the extent that it includes a convenient and prominently visible +feature that (1) displays an appropriate copyright notice, and (2) +tells the user that there is no warranty for the work (except to the +extent that warranties are provided), that licensees may convey the +work under this License, and how to view a copy of this License. If +the interface presents a list of user commands or options, such as a +menu, a prominent item in the list meets this criterion. + + 1. Source Code. + + The "source code" for a work means the preferred form of the work +for making modifications to it. "Object code" means any non-source +form of a work. + + A "Standard Interface" means an interface that either is an official +standard defined by a recognized standards body, or, in the case of +interfaces specified for a particular programming language, one that +is widely used among developers working in that language. + + The "System Libraries" of an executable work include anything, other +than the work as a whole, that (a) is included in the normal form of +packaging a Major Component, but which is not part of that Major +Component, and (b) serves only to enable use of the work with that +Major Component, or to implement a Standard Interface for which an +implementation is available to the public in source code form. A +"Major Component", in this context, means a major essential component +(kernel, window system, and so on) of the specific operating system +(if any) on which the executable work runs, or a compiler used to +produce the work, or an object code interpreter used to run it. + + The "Corresponding Source" for a work in object code form means all +the source code needed to generate, install, and (for an executable +work) run the object code and to modify the work, including scripts to +control those activities. However, it does not include the work's +System Libraries, or general-purpose tools or generally available free +programs which are used unmodified in performing those activities but +which are not part of the work. For example, Corresponding Source +includes interface definition files associated with source files for +the work, and the source code for shared libraries and dynamically +linked subprograms that the work is specifically designed to require, +such as by intimate data communication or control flow between those +subprograms and other parts of the work. + + The Corresponding Source need not include anything that users +can regenerate automatically from other parts of the Corresponding +Source. + + The Corresponding Source for a work in source code form is that +same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of +copyright on the Program, and are irrevocable provided the stated +conditions are met. This License explicitly affirms your unlimited +permission to run the unmodified Program. The output from running a +covered work is covered by this License only if the output, given its +content, constitutes a covered work. This License acknowledges your +rights of fair use or other equivalent, as provided by copyright law. + + You may make, run and propagate covered works that you do not +convey, without conditions so long as your license otherwise remains +in force. You may convey covered works to others for the sole purpose +of having them make modifications exclusively for you, or provide you +with facilities for running those works, provided that you comply with +the terms of this License in conveying all material for which you do +not control copyright. Those thus making or running the covered works +for you must do so exclusively on your behalf, under your direction +and control, on terms that prohibit them from making any copies of +your copyrighted material outside their relationship with you. + + Conveying under any other circumstances is permitted solely under +the conditions stated below. Sublicensing is not allowed; section 10 +makes it unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological +measure under any applicable law fulfilling obligations under article +11 of the WIPO copyright treaty adopted on 20 December 1996, or +similar laws prohibiting or restricting circumvention of such +measures. + + When you convey a covered work, you waive any legal power to forbid +circumvention of technological measures to the extent such circumvention +is effected by exercising rights under this License with respect to +the covered work, and you disclaim any intention to limit operation or +modification of the work as a means of enforcing, against the work's +users, your or third parties' legal rights to forbid circumvention of +technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you +receive it, in any medium, provided that you conspicuously and +appropriately publish on each copy an appropriate copyright notice; +keep intact all notices stating that this License and any +non-permissive terms added in accord with section 7 apply to the code; +keep intact all notices of the absence of any warranty; and give all +recipients a copy of this License along with the Program. + + You may charge any price or no price for each copy that you convey, +and you may offer support or warranty protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to +produce it from the Program, in the form of source code under the +terms of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified + it, and giving a relevant date. + + b) The work must carry prominent notices stating that it is + released under this License and any conditions added under section + 7. This requirement modifies the requirement in section 4 to + "keep intact all notices". + + c) You must license the entire work, as a whole, under this + License to anyone who comes into possession of a copy. This + License will therefore apply, along with any applicable section 7 + additional terms, to the whole of the work, and all its parts, + regardless of how they are packaged. This License gives no + permission to license the work in any other way, but it does not + invalidate such permission if you have separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your + work need not make them do so. + + A compilation of a covered work with other separate and independent +works, which are not by their nature extensions of the covered work, +and which are not combined with it such as to form a larger program, +in or on a volume of a storage or distribution medium, is called an +"aggregate" if the compilation and its resulting copyright are not +used to limit the access or legal rights of the compilation's users +beyond what the individual works permit. Inclusion of a covered work +in an aggregate does not cause this License to apply to the other +parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms +of sections 4 and 5, provided that you also convey the +machine-readable Corresponding Source under the terms of this License, +in one of these ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium + customarily used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a + written offer, valid for at least three years and valid for as + long as you offer spare parts or customer support for that product + model, to give anyone who possesses the object code either (1) a + copy of the Corresponding Source for all the software in the + product that is covered by this License, on a durable physical + medium customarily used for software interchange, for a price no + more than your reasonable cost of physically performing this + conveying of source, or (2) access to copy the + Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This + alternative is allowed only occasionally and noncommercially, and + only if you received the object code with such an offer, in accord + with subsection 6b. + + d) Convey the object code by offering access from a designated + place (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to + copy the object code is a network server, the Corresponding Source + may be on a different server (operated by you or a third party) + that supports equivalent copying facilities, provided you maintain + clear directions next to the object code saying where to find the + Corresponding Source. Regardless of what server hosts the + Corresponding Source, you remain obligated to ensure that it is + available for as long as needed to satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided + you inform other peers where the object code and Corresponding + Source of the work are being offered to the general public at no + charge under subsection 6d. + + A separable portion of the object code, whose source code is excluded +from the Corresponding Source as a System Library, need not be +included in conveying the object code work. + + A "User Product" is either (1) a "consumer product", which means any +tangible personal property which is normally used for personal, family, +or household purposes, or (2) anything designed or sold for incorporation +into a dwelling. In determining whether a product is a consumer product, +doubtful cases shall be resolved in favor of coverage. For a particular +product received by a particular user, "normally used" refers to a +typical or common use of that class of product, regardless of the status +of the particular user or of the way in which the particular user +actually uses, or expects or is expected to use, the product. A product +is a consumer product regardless of whether the product has substantial +commercial, industrial or non-consumer uses, unless such uses represent +the only significant mode of use of the product. + + "Installation Information" for a User Product means any methods, +procedures, authorization keys, or other information required to install +and execute modified versions of a covered work in that User Product from +a modified version of its Corresponding Source. The information must +suffice to ensure that the continued functioning of the modified object +code is in no case prevented or interfered with solely because +modification has been made. + + If you convey an object code work under this section in, or with, or +specifically for use in, a User Product, and the conveying occurs as +part of a transaction in which the right of possession and use of the +User Product is transferred to the recipient in perpetuity or for a +fixed term (regardless of how the transaction is characterized), the +Corresponding Source conveyed under this section must be accompanied +by the Installation Information. But this requirement does not apply +if neither you nor any third party retains the ability to install +modified object code on the User Product (for example, the work has +been installed in ROM). + + The requirement to provide Installation Information does not include a +requirement to continue to provide support service, warranty, or updates +for a work that has been modified or installed by the recipient, or for +the User Product in which it has been modified or installed. Access to a +network may be denied when the modification itself materially and +adversely affects the operation of the network or violates the rules and +protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, +in accord with this section must be in a format that is publicly +documented (and with an implementation available to the public in +source code form), and must require no special password or key for +unpacking, reading or copying. + + 7. Additional Terms. + + "Additional permissions" are terms that supplement the terms of this +License by making exceptions from one or more of its conditions. +Additional permissions that are applicable to the entire Program shall +be treated as though they were included in this License, to the extent +that they are valid under applicable law. If additional permissions +apply only to part of the Program, that part may be used separately +under those permissions, but the entire Program remains governed by +this License without regard to the additional permissions. + + When you convey a copy of a covered work, you may at your option +remove any additional permissions from that copy, or from any part of +it. (Additional permissions may be written to require their own +removal in certain cases when you modify the work.) You may place +additional permissions on material, added by you to a covered work, +for which you have or can give appropriate copyright permission. + + Notwithstanding any other provision of this License, for material you +add to a covered work, you may (if authorized by the copyright holders of +that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some + trade names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that + material by anyone who conveys the material (or modified versions of + it) with contractual assumptions of liability to the recipient, for + any liability that these contractual assumptions directly impose on + those licensors and authors. + + All other non-permissive additional terms are considered "further +restrictions" within the meaning of section 10. If the Program as you +received it, or any part of it, contains a notice stating that it is +governed by this License along with a term that is a further +restriction, you may remove that term. If a license document contains +a further restriction but permits relicensing or conveying under this +License, you may add to a covered work material governed by the terms +of that license document, provided that the further restriction does +not survive such relicensing or conveying. + + If you add terms to a covered work in accord with this section, you +must place, in the relevant source files, a statement of the +additional terms that apply to those files, or a notice indicating +where to find the applicable terms. + + Additional terms, permissive or non-permissive, may be stated in the +form of a separately written license, or stated as exceptions; +the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly +provided under this License. Any attempt otherwise to propagate or +modify it is void, and will automatically terminate your rights under +this License (including any patent licenses granted under the third +paragraph of section 11). + + However, if you cease all violation of this License, then your +license from a particular copyright holder is reinstated (a) +provisionally, unless and until the copyright holder explicitly and +finally terminates your license, and (b) permanently, if the copyright +holder fails to notify you of the violation by some reasonable means +prior to 60 days after the cessation. + + Moreover, your license from a particular copyright holder is +reinstated permanently if the copyright holder notifies you of the +violation by some reasonable means, this is the first time you have +received notice of violation of this License (for any work) from that +copyright holder, and you cure the violation prior to 30 days after +your receipt of the notice. + + Termination of your rights under this section does not terminate the +licenses of parties who have received copies or rights from you under +this License. If your rights have been terminated and not permanently +reinstated, you do not qualify to receive new licenses for the same +material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or +run a copy of the Program. Ancillary propagation of a covered work +occurring solely as a consequence of using peer-to-peer transmission +to receive a copy likewise does not require acceptance. However, +nothing other than this License grants you permission to propagate or +modify any covered work. These actions infringe copyright if you do +not accept this License. Therefore, by modifying or propagating a +covered work, you indicate your acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically +receives a license from the original licensors, to run, modify and +propagate that work, subject to this License. You are not responsible +for enforcing compliance by third parties with this License. + + An "entity transaction" is a transaction transferring control of an +organization, or substantially all assets of one, or subdividing an +organization, or merging organizations. If propagation of a covered +work results from an entity transaction, each party to that +transaction who receives a copy of the work also receives whatever +licenses to the work the party's predecessor in interest had or could +give under the previous paragraph, plus a right to possession of the +Corresponding Source of the work from the predecessor in interest, if +the predecessor has it or can get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the +rights granted or affirmed under this License. For example, you may +not impose a license fee, royalty, or other charge for exercise of +rights granted under this License, and you may not initiate litigation +(including a cross-claim or counterclaim in a lawsuit) alleging that +any patent claim is infringed by making, using, selling, offering for +sale, or importing the Program or any portion of it. + + 11. Patents. + + A "contributor" is a copyright holder who authorizes use under this +License of the Program or a work on which the Program is based. The +work thus licensed is called the contributor's "contributor version". + + A contributor's "essential patent claims" are all patent claims +owned or controlled by the contributor, whether already acquired or +hereafter acquired, that would be infringed by some manner, permitted +by this License, of making, using, or selling its contributor version, +but do not include claims that would be infringed only as a +consequence of further modification of the contributor version. For +purposes of this definition, "control" includes the right to grant +patent sublicenses in a manner consistent with the requirements of +this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free +patent license under the contributor's essential patent claims, to +make, use, sell, offer for sale, import and otherwise run, modify and +propagate the contents of its contributor version. + + In the following three paragraphs, a "patent license" is any express +agreement or commitment, however denominated, not to enforce a patent +(such as an express permission to practice a patent or covenant not to +sue for patent infringement). To "grant" such a patent license to a +party means to make such an agreement or commitment not to enforce a +patent against the party. + + If you convey a covered work, knowingly relying on a patent license, +and the Corresponding Source of the work is not available for anyone +to copy, free of charge and under the terms of this License, through a +publicly available network server or other readily accessible means, +then you must either (1) cause the Corresponding Source to be so +available, or (2) arrange to deprive yourself of the benefit of the +patent license for this particular work, or (3) arrange, in a manner +consistent with the requirements of this License, to extend the patent +license to downstream recipients. "Knowingly relying" means you have +actual knowledge that, but for the patent license, your conveying the +covered work in a country, or your recipient's use of the covered work +in a country, would infringe one or more identifiable patents in that +country that you have reason to believe are valid. + + If, pursuant to or in connection with a single transaction or +arrangement, you convey, or propagate by procuring conveyance of, a +covered work, and grant a patent license to some of the parties +receiving the covered work authorizing them to use, propagate, modify +or convey a specific copy of the covered work, then the patent license +you grant is automatically extended to all recipients of the covered +work and works based on it. + + A patent license is "discriminatory" if it does not include within +the scope of its coverage, prohibits the exercise of, or is +conditioned on the non-exercise of one or more of the rights that are +specifically granted under this License. You may not convey a covered +work if you are a party to an arrangement with a third party that is +in the business of distributing software, under which you make payment +to the third party based on the extent of your activity of conveying +the work, and under which the third party grants, to any of the +parties who would receive the covered work from you, a discriminatory +patent license (a) in connection with copies of the covered work +conveyed by you (or copies made from those copies), or (b) primarily +for and in connection with specific products or compilations that +contain the covered work, unless you entered into that arrangement, +or that patent license was granted, prior to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting +any implied license or other defenses to infringement that may +otherwise be available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot convey a +covered work so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you may +not convey it at all. For example, if you agree to terms that obligate you +to collect a royalty for further conveying from those to whom you convey +the Program, the only way you could satisfy both those terms and this +License would be to refrain entirely from conveying the Program. + + 13. Remote Network Interaction; Use with the GNU General Public License. + + Notwithstanding any other provision of this License, if you modify the +Program, your modified version must prominently offer all users +interacting with it remotely through a computer network (if your version +supports such interaction) an opportunity to receive the Corresponding +Source of your version by providing access to the Corresponding Source +from a network server at no charge, through some standard or customary +means of facilitating copying of software. This Corresponding Source +shall include the Corresponding Source for any work covered by version 3 +of the GNU General Public License that is incorporated pursuant to the +following paragraph. + + Notwithstanding any other provision of this License, you have +permission to link or combine any covered work with a work licensed +under version 3 of the GNU General Public License into a single +combined work, and to convey the resulting work. The terms of this +License will continue to apply to the part which is the covered work, +but the work with which it is combined will remain governed by version +3 of the GNU General Public License. + + 14. Revised Versions of this License. + + The Free Software Foundation may publish revised and/or new versions of +the GNU Affero General Public License from time to time. Such new versions +will be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + + Each version is given a distinguishing version number. If the +Program specifies that a certain numbered version of the GNU Affero General +Public License "or any later version" applies to it, you have the +option of following the terms and conditions either of that numbered +version or of any later version published by the Free Software +Foundation. If the Program does not specify a version number of the +GNU Affero General Public License, you may choose any version ever published +by the Free Software Foundation. + + If the Program specifies that a proxy can decide which future +versions of the GNU Affero General Public License can be used, that proxy's +public statement of acceptance of a version permanently authorizes you +to choose that version for the Program. + + Later license versions may give you additional or different +permissions. However, no additional obligations are imposed on any +author or copyright holder as a result of your choosing to follow a +later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF +SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided +above cannot be given local legal effect according to their terms, +reviewing courts shall apply local law that most closely approximates +an absolute waiver of all civil liability in connection with the +Program, unless a warranty or assumption of liability accompanies a +copy of the Program in return for a fee. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +state the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + + Copyright (C) + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + +Also add information on how to contact you by electronic and paper mail. + + If your software can interact with users remotely through a computer +network, you should also make sure that it provides a way for users to +get its source. For example, if your program is a web application, its +interface could display a "Source" link that leads users to an archive +of the code. There are many ways you could offer source, and different +solutions will be better for different programs; see section 13 for the +specific requirements. + + You should also get your employer (if you work as a programmer) or school, +if any, to sign a "copyright disclaimer" for the program, if necessary. +For more information on this, and how to apply and follow the GNU AGPL, see +. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..ccb6979 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,25 @@ +services: + cv_backend: + build: + context: . + dockerfile: Dockerfile.cuda + image: x86_64/stream_tools:1.0.0 + command: bash + stdin_open: true # docker run -i + tty: true # docker run -t + container_name: stream_tools + shm_size: '2gb' + deploy: + resources: + limits: + memory: 12G + reservations: + memory: 10G + devices: + - driver: nvidia + device_ids: ['0'] + capabilities: [gpu] + environment: + - OMP_NUM_THREADS=1 + - MKL_THREADING_LAYER=GNU + - NVIDIA_DRIVER_CAPABILITIES=video,compute,utility diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e560e24 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,112 @@ +# Overview: +# This pyproject.toml file manages the build, packaging, and distribution of the Ultralytics library. +# It defines essential project metadata, dependencies, and settings used to develop and deploy the library. + +# Key Sections: +# - [build-system]: Specifies the build requirements and backend (e.g., setuptools, wheel). +# - [project]: Includes details like name, version, description, authors, dependencies and more. +# - [project.optional-dependencies]: Provides additional, optional packages for extended features. +# - [tool.*]: Configures settings for various tools (pytest, yapf, etc.) used in the project. + +# Installation: +# The stream-tools library can be installed using the command: 'pip install stream_tools' +# For development purposes, you can install the package in editable mode with: 'pip install -e .' +# This approach allows for real-time code modifications without the need for re-installation. + +[build-system] +requires = ["setuptools>=43.0.0", "wheel"] +build-backend = "setuptools.build_meta" + +# Project settings ----------------------------------------------------------------------------------------------------- +[project] +name = "stream_tools" +dynamic = ["version"] +description = "Tools for real-time AI powered video analytics." +readme = "README.md" +requires-python = ">=3.8" +license = { "text" = "AGPL-3.0" } +keywords = ["machine-learning", "deep-learning", "computer-vision", "real-time"] +authors = [ + { name = "Aleksandr Nevarko" }, + { name = "Ilya Basharov" }, +] +maintainers = [ + { name = "Aleksandr Nevarko" }, + { name = "Ilya Basharov" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Software Development", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Scientific/Engineering :: Image Recognition", + "Operating System :: POSIX :: Linux", +] + +# Required dependencies ------------------------------------------------------------------------------------------------ +dependencies = [ + "opencv-python-headless>=4.6.0", + "numpy>=1.26.4", + "numba>=0.59.1", +] + +# Optional dependencies ------------------------------------------------------------------------------------------------ +[project.optional-dependencies] +dev = [ + "ipython", + "pre-commit", + "pytest", + "pytest-cov", + "ipdb", +] +export = [ + "onnx>=1.12.0", # ONNX export +] + +[project.urls] +"Bug Reports" = "https://github.com/nkb-tech/stream-tools/issues" +"Source" = "https://github.com/nkb-tech/stream-tools.git" + +# Tools settings ------------------------------------------------------------------------------------------------------- +[tool.setuptools] # configuration specific to the `setuptools` build backend. +packages = { find = { where = ["."], include = ["stream_tools", "stream_tools.*"] } } +package-data = { "stream_tools" = ["**/*.yaml"], "stream_tools.assets" = ["*.jpg"] } + +[tool.setuptools.dynamic] +version = { attr = "stream_tools.__version__" } + +[tool.isort] +line_length = 120 +multi_line_output = 0 + +[tool.yapf] +based_on_style = "pep8" +spaces_before_comment = 2 +column_limit = 120 +coalesce_brackets = true +spaces_around_power_operator = true +space_between_ending_comma_and_closing_bracket = true +split_before_closing_bracket = false +split_before_first_argument = false + +[tool.ruff] +line-length = 120 + +[tool.docformatter] +wrap-summaries = 120 +wrap-descriptions = 120 +in-place = true +pre-summary-newline = true +close-quotes-on-newline = true + +[tool.codespell] +ignore-words-list = "crate,nd,ned,strack,dota,ane,segway,fo,gool,winn,commend,bloc,nam,afterall" +skip = '*.pt,*.pth,*.torchscript,*.onnx,*.tflite,*.pb,*.bin,*.param,*.mlmodel,*.engine,*.npy,*.data*,*.csv,*pnnx*,*venv*,*translat*,__pycache__*,*.ico,*.jpg,*.png,*.mp4,*.mov,/runs,/.git,./docs/??/*.md,./docs/mkdocs_??.yml' diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index b8157de..0000000 --- a/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -numpy -opencv-python -supervision -ipdb -boxmot \ No newline at end of file diff --git a/setup.py b/setup.py deleted file mode 100644 index ef5161e..0000000 --- a/setup.py +++ /dev/null @@ -1,23 +0,0 @@ -from setuptools import setup, find_packages - -with open('requirements.txt') as f: - packages = f.read().splitlines() - -AUTHOR = 'Aleksandr Nevarko' -AUTHOR_EMAIL = 'anevarko@mail.ru' - -if __name__ == "__main__": - setup( - name='stream_tools', - version="0.0.1", - packages=find_packages(exclude=['tests', 'scripts']), - url='https://github.com/nkb-tech/stream-tools', - license='', - author=AUTHOR, - author_email=AUTHOR_EMAIL, - description='', - install_requires=packages, - # tests_require=['pytest'], - include_package_data=True, - python_requires=">=3.9" - ) diff --git a/stream_tools/__init__.py b/stream_tools/__init__.py index e69de29..b8023d8 100644 --- a/stream_tools/__init__.py +++ b/stream_tools/__init__.py @@ -0,0 +1 @@ +__version__ = '0.0.1' diff --git a/stream_tools/action_tracker/__init__.py b/stream_tools/action_tracker/__init__.py index e7ce1d7..5f50331 100644 --- a/stream_tools/action_tracker/__init__.py +++ b/stream_tools/action_tracker/__init__.py @@ -1 +1 @@ -from stream_tools.action_tracker.movement_tracker import MultiCameraMovementTracker, MovementTracker \ No newline at end of file +from stream_tools.action_tracker.movement_tracker import MovementTracker, MultiCameraMovementTracker diff --git a/stream_tools/action_tracker/movement_tracker.py b/stream_tools/action_tracker/movement_tracker.py index b913f68..d7551b8 100644 --- a/stream_tools/action_tracker/movement_tracker.py +++ b/stream_tools/action_tracker/movement_tracker.py @@ -1,12 +1,9 @@ from datetime import datetime + class MovementTracker: - def __init__(self, - object_id, - timestamp, - movement_threshold=5, - log_file='movement_log.txt', - disappear_time=10): + + def __init__(self, object_id, timestamp, movement_threshold=5, log_file='movement_log.txt', disappear_time=10): self.object_id = object_id self.movement_threshold = movement_threshold self.log_file = log_file @@ -23,17 +20,15 @@ def update_position(self, position, timestamp): distance = self.calculate_distance(self.last_position, position) if distance > self.movement_threshold: self.log_event( - f"{self.object_id} moving {self.format_time(self.last_time)} : {self.format_time(timestamp)}" - ) + f'{self.object_id} moving {self.format_time(self.last_time)} : {self.format_time(timestamp)}') else: self.log_event( - f"{self.object_id} stationary {self.format_time(self.last_time)} : {self.format_time(timestamp)}" - ) + f'{self.object_id} stationary {self.format_time(self.last_time)} : {self.format_time(timestamp)}') moving = False self.last_position = position self.last_time = timestamp return moving - + # def update_position(self, position, timestamp): # if self.last_position is not None: # distance = self.calculate_distance(self.last_position, position) @@ -59,7 +54,7 @@ def update_position(self, position, timestamp): def log_event(self, message): with open(self.log_file, 'a') as file: - file.write(f"{message}\n") + file.write(f'{message}\n') @staticmethod def calculate_distance(pos1, pos2): @@ -67,11 +62,11 @@ def calculate_distance(pos1, pos2): @staticmethod def format_time(timestamp): - return timestamp.isoformat("T", "milliseconds").replace( - ":", "_" - ).replace('.', '_') + return timestamp.isoformat('T', 'milliseconds').replace(':', '_').replace('.', '_') + class MultiCameraMovementTracker: + def __init__(self, movement_threshold=5, log_file='movement_log.txt'): self.movement_threshold = movement_threshold self.log_file = log_file @@ -80,11 +75,8 @@ def __init__(self, movement_threshold=5, log_file='movement_log.txt'): def update_position(self, camera_id, object_id, object_class, position, timestamp): unique_id = (camera_id, object_id, object_class) if unique_id not in self.trackers: - self.trackers[unique_id] = MovementTracker( - object_id=unique_id, - timestamp=timestamp, - movement_threshold=self.movement_threshold, - log_file=self.log_file) - return self.trackers[unique_id].update_position( - position, - timestamp) + self.trackers[unique_id] = MovementTracker(object_id=unique_id, + timestamp=timestamp, + movement_threshold=self.movement_threshold, + log_file=self.log_file) + return self.trackers[unique_id].update_position(position, timestamp) diff --git a/stream_tools/config/__init__.py b/stream_tools/config/__init__.py index 2a3d810..fad2702 100644 --- a/stream_tools/config/__init__.py +++ b/stream_tools/config/__init__.py @@ -1,2 +1,2 @@ -from stream_tools.config.ivi_config import IvideonConfig from stream_tools.config.base_config import BaseConfig +from stream_tools.config.ivi_config import IvideonConfig diff --git a/stream_tools/config/base_config.py b/stream_tools/config/base_config.py index 4524460..6754eb6 100644 --- a/stream_tools/config/base_config.py +++ b/stream_tools/config/base_config.py @@ -1,16 +1,15 @@ +import json import logging + import pandas as pd import requests as re -import json class BaseConfig(): + def __init__(self, cfg): pass - + @property def cam_ids(self): return self.cams.cam_name.values - - - \ No newline at end of file diff --git a/stream_tools/config/ivi_config.py b/stream_tools/config/ivi_config.py index c8d1a97..542de4d 100644 --- a/stream_tools/config/ivi_config.py +++ b/stream_tools/config/ivi_config.py @@ -1,12 +1,14 @@ +import json import logging + import pandas as pd import requests as re -import json from stream_tools.config.base_config import BaseConfig class IvideonConfig(BaseConfig): + def __init__(self, cfg): self.api_url = cfg['api_url'] self.access_token = cfg['access_token'] @@ -16,25 +18,21 @@ def __init__(self, cfg): 'cam_name': [], 'ivideon_id': [], 'address': [], - 'link': [], - } + 'link': [], } for cam_name, params in cameras.items(): self.cams['cam_name'].append(cam_name) self.cams['ivideon_id'].append(params['ivideon_id']) self.cams['address'].append(params['address']) self.cams['link'].append('') self.cams = pd.DataFrame(self.cams) - + def update_cams(self, correct_idx): self.cams = self.cams.loc[correct_idx] - + @property def links(self): return self.cams['ivideon_id'].values - + @property def ivideon_ids(self): return self.cams.ivideon_id.values - - - \ No newline at end of file diff --git a/stream_tools/dataloader/__init__.py b/stream_tools/dataloader/__init__.py index f293002..9023e44 100644 --- a/stream_tools/dataloader/__init__.py +++ b/stream_tools/dataloader/__init__.py @@ -1,3 +1,3 @@ from stream_tools.dataloader.base import BaseStreamLoader +from stream_tools.dataloader.ivideon_loader import GPUIvideonStreamLoader, IvideonStreamLoader from stream_tools.dataloader.opencv_loader import OpenCVLoader -from stream_tools.dataloader.ivideon_loader import IvideonStreamLoader, GPUIvideonStreamLoader \ No newline at end of file diff --git a/stream_tools/dataloader/base.py b/stream_tools/dataloader/base.py index c4583eb..e8decae 100644 --- a/stream_tools/dataloader/base.py +++ b/stream_tools/dataloader/base.py @@ -1,30 +1,27 @@ #!/usr/bin/env python -import os import logging +import os import time from threading import Thread from typing import Union import cv2 - - logger = logging.getLogger(__name__) + class BaseStreamLoader: """BaseStreamLoader, i.e. `#RTSP, RTMP, HTTP streams`.""" - - def __init__( - self, - sources: list, - buffer_length: Union[str, int] = 10, - vid_fps: Union[str, int] = "auto", - max_first_attempts_to_reconnect: int = 30, - first_wait_time: float = 0.1, - second_wait_time: float = 60, - **kwargs - ) -> "BaseStreamLoader": + + def __init__(self, + sources: list, + buffer_length: Union[str, int] = 10, + vid_fps: Union[str, int] = 'auto', + max_first_attempts_to_reconnect: int = 30, + first_wait_time: float = 0.1, + second_wait_time: float = 60, + **kwargs) -> 'BaseStreamLoader': """Initialize stream loading threads from sources according to arguments Args: sources: a list of links to video streams @@ -39,9 +36,7 @@ def __init__( # Save arguments self.buffer_length = buffer_length # max buffer length self.vid_fps = vid_fps - self.max_first_attempts_to_reconnect = ( - max_first_attempts_to_reconnect - ) + self.max_first_attempts_to_reconnect = (max_first_attempts_to_reconnect) self.first_wait_time = first_wait_time self.second_wait_time = second_wait_time self.running = True # running flag for Thread @@ -56,8 +51,7 @@ def __init__( self.shape = [[] for _ in range(self.n)] self.caps = [None] * self.n # video capture objects self.started = [0] * self.n - - + def initialize(self): # Create a thread for each source and start it for i, s in enumerate(self.sources): # index, source @@ -68,18 +62,13 @@ def initialize(self): daemon=True, ) self.threads[i].start() - self.new_fps = ( - min(self.fps) - if isinstance(self.vid_fps, str) and self.vid_fps == "auto" - else self.vid_fps - ) # fps alignment - logger.info("") # newline - - + self.new_fps = (min(self.fps) if isinstance(self.vid_fps, str) and self.vid_fps == 'auto' else self.vid_fps + ) # fps alignment + logger.info('') # newline + @property def bs(self): return self.__len__() - def add_source(self, source: str): i = len(self.threads) @@ -90,57 +79,45 @@ def add_source(self, source: str): self.shape.append([]) self.caps.append(None) self.started.append(0) - self.threads.append( - Thread( - target=self.update, - args=([i, source]), - daemon=True, - ) - ) + self.threads.append(Thread( + target=self.update, + args=([i, source]), + daemon=True, + )) self.threads[i].start() return i - + def close_source(self, source: Union[str, int]): # TODO check source and finish func thread = self.threads[source] - self.threads = self.threads[:source] + self.threads[source+1:] - self.imgs = self.imgs[:source] + self.imgs[source+1:] - self.imgs = self.imgs[:source] + self.imgs[source+1:] - self.imgs = self.imgs[:source] + self.imgs[source+1:] + self.threads = self.threads[:source] + self.threads[source + 1:] + self.imgs = self.imgs[:source] + self.imgs[source + 1:] + self.imgs = self.imgs[:source] + self.imgs[source + 1:] + self.imgs = self.imgs[:source] + self.imgs[source + 1:] if self.threads[source].is_alive(): self.threads[source].join(timeout=5) # Add timeout - def update(self, i, source): - raise NotImplementedError("Implement update function in stream loader class") - - + raise NotImplementedError('Implement update function in stream loader class') + def close(self): """Close stream loader and release resources.""" self.running = False # stop flag for Thread for thread in self.threads: if thread.is_alive(): thread.join(timeout=5) # Add timeout - for ( - cap - ) in ( - self.caps - ): # Iterate through the stored VideoCapture objects + for (cap) in (self.caps): # Iterate through the stored VideoCapture objects try: cap.release() # release video capture except Exception as e: - logger.warning( - f"WARNING ⚠️ Could not release VideoCapture object: {e}" - ) + logger.warning(f'WARNING ⚠️ Could not release VideoCapture object: {e}') # cv2.destroyAllWindows() - - + def __iter__(self): """Iterates through image feed and re-opens unresponsive streams.""" self.count = -1 return self - def __next__(self): """Returns original images for processing.""" self.count += 1 @@ -152,11 +129,7 @@ def __next__(self): for i, x in enumerate(self.imgs): # If image is not available if not x: - if not self.threads[i].is_alive() or cv2.waitKey( - 1 - ) == ord( - "q" - ): # q to quit + if not self.threads[i].is_alive() or cv2.waitKey(1) == ord('q'): # q to quit self.close() raise StopIteration # logger.warning(f"WARNING ⚠️ Waiting for stream {i}") @@ -172,6 +145,4 @@ def __next__(self): def __len__(self): """Return the length of the sources object.""" - return len( - self.sources - ) # 1E12 frames = 32 streams at 30 FPS for 30 years + return len(self.sources) # 1E12 frames = 32 streams at 30 FPS for 30 years diff --git a/stream_tools/dataloader/ivideon_loader.py b/stream_tools/dataloader/ivideon_loader.py index 46dc245..8c9ded2 100644 --- a/stream_tools/dataloader/ivideon_loader.py +++ b/stream_tools/dataloader/ivideon_loader.py @@ -1,40 +1,38 @@ #!/usr/bin/env python +import json import logging import time from collections import deque -import requests as re -import json from typing import Union import cv2 +import requests as re from stream_tools.dataloader import BaseStreamLoader logger = logging.getLogger(__name__) -try: - from tensor_stream import TensorStreamConverter, FourCC, Planes, FrameRate +try: + from tensor_stream import FourCC, FrameRate, Planes, TensorStreamConverter except ImportError: # package not installed, skip pass class IvideonStreamLoader(BaseStreamLoader): - - def __init__( - self, - sources: list, - buffer_length: Union[str, int] = 10, - vid_fps: Union[str, int] = "auto", - max_first_attempts_to_reconnect: int = 30, - first_wait_time: float = 0.1, - second_wait_time: float = 60, - api_url: str = "", - api_args: str = "", - access_token: str = "", - **kwargs - ) -> "IvideonStreamLoader": - + + def __init__(self, + sources: list, + buffer_length: Union[str, int] = 10, + vid_fps: Union[str, int] = 'auto', + max_first_attempts_to_reconnect: int = 30, + first_wait_time: float = 0.1, + second_wait_time: float = 60, + api_url: str = '', + api_args: str = '', + access_token: str = '', + **kwargs) -> 'IvideonStreamLoader': + self.api_url = api_url self.api_args = api_args self.access_token = access_token @@ -46,7 +44,7 @@ def __init__( first_wait_time, second_wait_time, ) - + def _get_ivideon_link(self, s): url = f'{self.api_url}cameras/{s}/{self.api_args}&access_token={self.access_token}' resp = re.get(url) @@ -55,12 +53,12 @@ def _get_ivideon_link(self, s): return None stream_url = json.loads(resp.text)['result']['url'] return stream_url - + def update(self, i, stream): """Read stream `i` frames in daemon thread.""" link = None attempt = 0 - st = f"{i + 1}/{self.n}: {stream}... " + st = f'{i + 1}/{self.n}: {stream}... ' w, h = 0, 0 while link is None or self.caps[i] is None: link = self._get_ivideon_link(stream) @@ -74,16 +72,12 @@ def update(self, i, stream): try: cap = cv2.VideoCapture(link) except Exception as ex: - logger.warning( - f"Video stream {i} is unresponsive on start: {ex}, reconnecting..." - ) + logger.warning(f'Video stream {i} is unresponsive on start: {ex}, reconnecting...') continue self.caps[i] = cap success, im = self.caps[i].read() # guarantee first frame if not success or im is None: - logger.warning( - f"{st}Failed to read images from {stream}" - ) + logger.warning(f'{st}Failed to read images from {stream}') attempt += 1 continue w = int(self.caps[i].get(cv2.CAP_PROP_FRAME_WIDTH)) @@ -98,58 +92,48 @@ def update(self, i, stream): buf = deque(maxlen=self.buffer_length) buf.append(im) self.imgs[i] = buf - logger.info( - f"{st}Success ✅ ({self.frames[i]} frames of shape {w}x{h} at {self.fps[i]:.2f} FPS)" - ) - + logger.info(f'{st}Success ✅ ({self.frames[i]} frames of shape {w}x{h} at {self.fps[i]:.2f} FPS)') + n, f = 0, self.frames[i] # frame number, frame array while self.running and n < (f - 1): - success = ( - cap.grab() - ) # .read() = .grab() followed by .retrieve() + success = (cap.grab()) # .read() = .grab() followed by .retrieve() im = self.imgs[i][-1] if not success: - logger.warning( - f"WARNING ⚠️ Video stream {i} unresponsive, please check your IP camera connection." - ) + logger.warning(f'WARNING ⚠️ Video stream {i} unresponsive, please check your IP camera connection.') if attempt < self.max_first_attempts_to_reconnect: time.sleep(self.first_wait_time) else: time.sleep(self.second_wait_time) attempt += 1 reopen_status = cap.open(self._get_ivideon_link(self.sources[i])) # re-open stream if signal was lost - logger.info( - f"Attemp to re-open video stream {i}, result: {reopen_status}" - ) + logger.info(f'Attemp to re-open video stream {i}, result: {reopen_status}') else: success, im = cap.retrieve() if not success: im = None - logger.warning( - f"WARNING ⚠️ Cannot decode image from video stream {i}. Unknown error." - ) + logger.warning(f'WARNING ⚠️ Cannot decode image from video stream {i}. Unknown error.') self.imgs[i].append(im) n += 1 else: - logger.info(f"End of stream {i}.") + logger.info(f'End of stream {i}.') class GPUIvideonStreamLoader(IvideonStreamLoader): - + def __init__( - self, + self, sources: list, buffer_length: Union[str, int] = 10, - vid_fps: Union[str, int] = "auto", + vid_fps: Union[str, int] = 'auto', max_first_attempts_to_reconnect: int = 30, first_wait_time: float = 0.1, second_wait_time: float = 60, - api_url: str = "", - api_args: str = "", - access_token: str = "", + api_url: str = '', + api_args: str = '', + access_token: str = '', cuda_device: int = 0, - ) -> "IvideonStreamLoader": - + ) -> 'IvideonStreamLoader': + self.api_url = api_url self.api_args = api_args self.access_token = access_token @@ -162,13 +146,12 @@ def __init__( first_wait_time, second_wait_time, ) - - + def update(self, i, stream): """Read stream `i` frames in daemon thread.""" link = None attempt = 0 - st = f"{i + 1}/{self.n}: {stream}... " + st = f'{i + 1}/{self.n}: {stream}... ' w, h = 0, 0 while link is None or self.caps[i] is None: link = self._get_ivideon_link(stream) @@ -181,18 +164,16 @@ def update(self, i, stream): continue try: cap = TensorStreamConverter( - stream, + stream, cuda_device=self.cuda_device, max_consumers=1, buffer_size=self.buffer_length, framerate_mode=FrameRate.NATIVE_LOW_DELAY, - ) + ) cap.initialize() cap.start() except Exception as ex: - logger.warning( - f"Video stream {i} is unresponsive on start: {ex}, reconnecting..." - ) + logger.warning(f'Video stream {i} is unresponsive on start: {ex}, reconnecting...') continue self.caps[i] = cap w = int(self.caps[i].frame_size[0]) @@ -202,17 +183,13 @@ def update(self, i, stream): self.caps[i] = None continue try: - im = self.caps[i].read( - width=w, - height=h, - pixel_format=FourCC.RGB24, - planes_pos=Planes.PLANAR, - normalization=True - ) # guarantee first frame + im = self.caps[i].read(width=w, + height=h, + pixel_format=FourCC.RGB24, + planes_pos=Planes.PLANAR, + normalization=True) # guarantee first frame except Exception as err: - logger.warning( - f"{st}Failed to read images from {stream}: {err}" - ) + logger.warning(f'{st}Failed to read images from {stream}: {err}') attempt += 1 self.caps[i] = None continue @@ -226,53 +203,45 @@ def update(self, i, stream): buf = deque(maxlen=self.buffer_length) buf.append(im) self.imgs[i] = buf - logger.info( - f"{st}Success ✅ ({self.frames[i]} frames of shape {w}x{h} at {self.fps[i]:.2f} FPS)" - ) - + logger.info(f'{st}Success ✅ ({self.frames[i]} frames of shape {w}x{h} at {self.fps[i]:.2f} FPS)') + n, f = 0, self.frames[i] # frame number, frame array while self.running and n < (f - 1): try: - im = self.caps[i].read( - width=w, - height=h, - pixel_format=FourCC.RGB24, - planes_pos=Planes.PLANAR, - normalization=True - ) # guarantee first frame + im = self.caps[i].read(width=w, + height=h, + pixel_format=FourCC.RGB24, + planes_pos=Planes.PLANAR, + normalization=True) # guarantee first frame success = True except: im = self.imgs[i][-1] success = False if not success: - logger.warning( - f"WARNING ⚠️ Video stream {i} unresponsive, please check your IP camera connection." - ) + logger.warning(f'WARNING ⚠️ Video stream {i} unresponsive, please check your IP camera connection.') if attempt < self.max_first_attempts_to_reconnect: time.sleep(self.first_wait_time) else: time.sleep(self.second_wait_time) attempt += 1 self.caps[i] = TensorStreamConverter( - self._get_ivideon_link(self.sources[i]), + self._get_ivideon_link(self.sources[i]), cuda_device=self.cuda_device, max_consumers=1, buffer_size=self.buffer_length, framerate_mode=FrameRate.NATIVE_LOW_DELAY, - ) + ) try: cap.initialize() - cap.start() # re-open stream if signal was lost + cap.start() # re-open stream if signal was lost except Exception as err: self.caps[i] = None attempt += 1 continue else: err = True - logger.info( - f"Attemp to re-open video stream {i}, result: {err}" - ) + logger.info(f'Attemp to re-open video stream {i}, result: {err}') self.imgs[i].append(im) n += 1 else: - logger.info(f"End of stream {i}.") \ No newline at end of file + logger.info(f'End of stream {i}.') diff --git a/stream_tools/dataloader/opencv_loader.py b/stream_tools/dataloader/opencv_loader.py index d35a75b..6bd91b0 100644 --- a/stream_tools/dataloader/opencv_loader.py +++ b/stream_tools/dataloader/opencv_loader.py @@ -1,22 +1,22 @@ +import logging import os - -import cv2 - import time -import logging from collections import deque +import cv2 + logger = logging.getLogger(__name__) from stream_tools.dataloader import BaseStreamLoader + class OpenCVLoader(BaseStreamLoader): - + def update(self, i, source): """Read stream `i` frames in daemon thread.""" # os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "threads;2" attempt = 0 - st = f"{i + 1}/{self.n}: {source}... " + st = f'{i + 1}/{self.n}: {source}... ' w, h = 0, 0 while self.caps[i] is None: if attempt == 0: @@ -28,17 +28,13 @@ def update(self, i, source): try: cap = cv2.VideoCapture(source) except Exception as ex: - logger.warning( - f"Video stream {i} is unresponsive on start: {ex}, reconnecting..." - ) + logger.warning(f'Video stream {i} is unresponsive on start: {ex}, reconnecting...') attempt += 1 continue self.caps[i] = cap success, im = self.caps[i].read() # guarantee first frame if not success or im is None: - logger.warning( - f"{st}Failed to read images from {source}, reconnecting..." - ) + logger.warning(f'{st}Failed to read images from {source}, reconnecting...') attempt += 1 self.caps[i] = None continue @@ -49,26 +45,20 @@ def update(self, i, source): self.caps[i] = None continue self.fps[i] = self.caps[i].get(cv2.CAP_PROP_FPS) - self.frames[i] = float("inf") + self.frames[i] = float('inf') self.shape[i] = [w, h] buf = deque(maxlen=self.buffer_length) buf.append(im) self.imgs[i] = buf - logger.info( - f"{st}Success ✅ ({self.frames[i]} frames of shape {w}x{h} at {self.fps[i]:.2f} FPS)" - ) + logger.info(f'{st}Success ✅ ({self.frames[i]} frames of shape {w}x{h} at {self.fps[i]:.2f} FPS)') attempt = 0 n, f = 0, self.frames[i] # frame number, frame array self.started[i] = 1 while self.running and n < (f - 1): # and cap.isOpened() - success = ( - cap.grab() - ) # .read() = .grab() followed by .retrieve() + success = (cap.grab()) # .read() = .grab() followed by .retrieve() im = self.imgs[i][-1] # Default to last valid image if not success: - logger.warning( - f"WARNING ⚠️ Video stream {i} unresponsive, please check your IP camera connection." - ) + logger.warning(f'WARNING ⚠️ Video stream {i} unresponsive, please check your IP camera connection.') if attempt < self.max_first_attempts_to_reconnect: time.sleep(self.first_wait_time) else: @@ -80,9 +70,7 @@ def update(self, i, source): attempt = 0 if not success: im = None - logger.warning( - f"WARNING ⚠️ Cannot decode image from video stream {i}. Unknown error." - ) + logger.warning(f'WARNING ⚠️ Cannot decode image from video stream {i}. Unknown error.') if attempt < self.max_first_attempts_to_reconnect: time.sleep(self.first_wait_time) else: @@ -92,4 +80,4 @@ def update(self, i, source): self.imgs[i].append(im) n += 1 else: - logger.info(f"End of stream {i}.") + logger.info(f'End of stream {i}.') diff --git a/stream_tools/model/__init__.py b/stream_tools/model/__init__.py index 29df5df..34a2102 100644 --- a/stream_tools/model/__init__.py +++ b/stream_tools/model/__init__.py @@ -1,2 +1,2 @@ +from stream_tools.model.classifier import YoloClassifier from stream_tools.model.detector import Detector -from stream_tools.model.classifier import YoloClassifier \ No newline at end of file diff --git a/stream_tools/model/classifier.py b/stream_tools/model/classifier.py index 0643c0c..3f7ed6f 100644 --- a/stream_tools/model/classifier.py +++ b/stream_tools/model/classifier.py @@ -10,9 +10,10 @@ class YoloClassifier: + def __init__(self, cfg, device): # TODO add zones - self.model = YOLO(model=cfg["model_path"], task="classify") + self.model = YOLO(model=cfg['model_path'], task='classify') self.device = device self.cfg = cfg # Dummy inference for model warmup @@ -21,27 +22,25 @@ def __init__(self, cfg, device): np.random.randint( low=0, high=255, - size=(cfg["orig_img_h"], cfg["orig_img_w"], 3), + size=(cfg['orig_img_h'], cfg['orig_img_w'], 3), dtype=np.uint8, - ) - for _ in range(cfg["inference_bs"]) - ] + ) for _ in range(cfg['inference_bs'])] self.model( source=dummy_imgs, device=self.device, - imgsz=cfg["inference_imgsz"], + imgsz=cfg['inference_imgsz'], # conf=cfg["inference_conf"], stream=False, verbose=False, half=True, ) - self.time_logging_period = cfg["time_logging_period"] + self.time_logging_period = cfg['time_logging_period'] self.n_calls = -1 @property def names(self): return self.model.names - + def __call__(self, imgs: list) -> Any: self.n_calls += 1 return self.inference(imgs) @@ -60,12 +59,12 @@ def inference(self, imgs: Any): h, w, _ = imgs[i].shape correct_frame_idx.append(i) if (h, w) != ( - self.cfg["orig_img_h"], - self.cfg["orig_img_w"], + self.cfg['orig_img_h'], + self.cfg['orig_img_w'], ): imgs[i] = cv2.resize( imgs[i], - (self.cfg["orig_img_w"], self.cfg["orig_img_h"]), + (self.cfg['orig_img_w'], self.cfg['orig_img_h']), # Default YOLO interpolation interpolation=cv2.INTER_AREA, ) @@ -73,7 +72,7 @@ def inference(self, imgs: Any): results = self.model( source=imgs_to_infer, device=self.device, - imgsz=self.cfg["inference_imgsz"], + imgsz=self.cfg['inference_imgsz'], # conf=self.cfg["inference_conf"], stream=False, verbose=False, @@ -88,7 +87,5 @@ def inference(self, imgs: Any): time_spent_ns = end_time_ns - start_time_ns time_spent_ms = time_spent_ns / 1e6 if self.n_calls % self.time_logging_period == 0: - logger.info( - f"Classifier inference on {len(imgs)} images took {time_spent_ms:.1f} ms" - ) - return (preds[0], confs[0]) if single else (preds, confs) \ No newline at end of file + logger.info(f'Classifier inference on {len(imgs)} images took {time_spent_ms:.1f} ms') + return (preds[0], confs[0]) if single else (preds, confs) diff --git a/stream_tools/model/detector.py b/stream_tools/model/detector.py index 78ba1f1..4f643ca 100644 --- a/stream_tools/model/detector.py +++ b/stream_tools/model/detector.py @@ -11,12 +11,13 @@ class Detector: + def __init__(self, cfg): # TODO add zones - self.model = YOLO(model=cfg["model_path"], task="detect") + self.model = YOLO(model=cfg['model_path'], task='detect') self.device = torch.device(cfg['device']) self.cfg = cfg - + def initialize(self): # Dummy inference for model warmup for _ in range(100): @@ -24,27 +25,25 @@ def initialize(self): np.random.randint( low=0, high=255, - size=(self.cfg["orig_img_h"], self.cfg["orig_img_w"], 3), + size=(self.cfg['orig_img_h'], self.cfg['orig_img_w'], 3), dtype=np.uint8, - ) - for _ in range(self.cfg["inference_bs"]) - ] + ) for _ in range(self.cfg['inference_bs'])] self.model( source=dummy_imgs, device=self.device, - imgsz=self.cfg["inference_imgsz"], - conf=self.cfg["inference_conf"], + imgsz=self.cfg['inference_imgsz'], + conf=self.cfg['inference_conf'], stream=False, verbose=False, half=True, ) - self.time_logging_period = self.cfg["time_logging_period"] + self.time_logging_period = self.cfg['time_logging_period'] self.n_calls = -1 @property def names(self): return self.model.names - + def __call__(self, imgs: list) -> Any: self.n_calls += 1 return self.inference(imgs) @@ -73,8 +72,8 @@ def inference(self, imgs: list): results = self.model( source=imgs_to_infer, device=self.device, - imgsz=self.cfg["inference_imgsz"], - conf=self.cfg["inference_conf"], + imgsz=self.cfg['inference_imgsz'], + conf=self.cfg['inference_conf'], stream=False, verbose=False, half=True, @@ -86,7 +85,5 @@ def inference(self, imgs: list): time_spent_ns = end_time_ns - start_time_ns time_spent_ms = time_spent_ns / 1e6 if self.n_calls % self.time_logging_period == 0: - logger.info( - f"Detector inference on {len(correct_frame_idx)} images took {time_spent_ms:.1f} ms" - ) - return dets \ No newline at end of file + logger.info(f'Detector inference on {len(correct_frame_idx)} images took {time_spent_ms:.1f} ms') + return dets diff --git a/stream_tools/pipeline/__init__.py b/stream_tools/pipeline/__init__.py index bda920b..863dbe0 100644 --- a/stream_tools/pipeline/__init__.py +++ b/stream_tools/pipeline/__init__.py @@ -1,2 +1,2 @@ +from stream_tools.pipeline.mutlitrack import MultiTrackWorker from stream_tools.pipeline.pipeline_base import BaseWorker -from stream_tools.pipeline.mutlitrack import MultiTrackWorker \ No newline at end of file diff --git a/stream_tools/pipeline/mutlitrack.py b/stream_tools/pipeline/mutlitrack.py index a329c5b..bd2deaa 100644 --- a/stream_tools/pipeline/mutlitrack.py +++ b/stream_tools/pipeline/mutlitrack.py @@ -1,32 +1,33 @@ -import cv2 -import numpy as np import argparse -import yaml -import torch -from pathlib import Path -from datetime import datetime -from time import perf_counter_ns -import pytz import logging import warnings from collections import defaultdict +from datetime import datetime +from pathlib import Path from queue import Empty, Queue from threading import Event, Thread -warnings.filterwarnings("ignore") +from time import perf_counter_ns + +import cv2 +import numpy as np +import pytz +import torch +import yaml + +warnings.filterwarnings('ignore') import boxmot as bx -from stream_tools.pipeline import BaseWorker +from stream_tools.config import BaseConfig from stream_tools.dataloader import BaseStreamLoader from stream_tools.model import Detector -from stream_tools.config import BaseConfig +from stream_tools.pipeline import BaseWorker -TIMEZONE = pytz.timezone( - "Europe/Moscow" -) # UTC, Asia/Shanghai, Europe/Berlin +TIMEZONE = pytz.timezone('Europe/Moscow') # UTC, Asia/Shanghai, Europe/Berlin logger = logging.getLogger(__name__) + def timetz(*args): return datetime.now(TIMEZONE).timetuple() @@ -34,7 +35,7 @@ def timetz(*args): class MultiTrackWorker(BaseWorker): _TIMEOUT = 2 - def __init__(self, + def __init__(self, dataloader: BaseStreamLoader, detector: Detector, tracker_cfg: dict, @@ -47,45 +48,33 @@ def __init__(self, self.cams_cfg = cams_cfg self.inf_cfg = inf_cfg # Streams - logger.info(f"Initializing stream loader...") + logger.info(f'Initializing stream loader...') self.dataloader = dataloader self.dataloader.initialize() - logger.info(f"Stream loader initialized") + logger.info(f'Stream loader initialized') # Models self.detector = detector self.names = self.detector.names self.detector.initialize() - logger.info(f"Detector initialized") + logger.info(f'Detector initialized') # Trackers - self.trackers = { - cam_id: bx.create_tracker(**tracker_cfg) - for cam_id in self.cams_cfg.cam_ids - } + self.trackers = {cam_id: bx.create_tracker(**tracker_cfg) for cam_id in self.cams_cfg.cam_ids} self.poses = {cam_id: dict() for cam_id in self.cams_cfg.cam_ids} - logger.info(f"Trackers initialized") + logger.info(f'Trackers initialized') # Debug self.debug = debug if self.debug: - self.inf_cfg["debug"]["save_img_path"] = Path( - self.inf_cfg["debug"]["save_img_path"] - ) / datetime.now().isoformat("T", "seconds").replace( - ":", "_" - ) - logger.info( - f"Debug mode: ON, saving data to {self.inf_cfg['debug']['save_img_path']}" - ) - save_img_path = self.inf_cfg["debug"]["save_img_path"] - (save_img_path / "images").mkdir( - exist_ok=True, parents=True - ) - (save_img_path / "labels").mkdir( - exist_ok=True, parents=True - ) + self.inf_cfg['debug']['save_img_path'] = Path( + self.inf_cfg['debug']['save_img_path']) / datetime.now().isoformat('T', 'seconds').replace(':', '_') + logger.info(f"Debug mode: ON, saving data to {self.inf_cfg['debug']['save_img_path']}") + save_img_path = self.inf_cfg['debug']['save_img_path'] + (save_img_path / 'images').mkdir(exist_ok=True, parents=True) + (save_img_path / 'labels').mkdir(exist_ok=True, parents=True) for cam_id in self.cams_cfg.cam_ids: - (save_img_path / "crops" / str(cam_id)).mkdir(exist_ok=True, parents=True) + (save_img_path / 'crops' / str(cam_id)).mkdir(exist_ok=True, parents=True) self.save_img_path = save_img_path else: - logger.info(f"Debug mode: OFF") + logger.info(f'Debug mode: OFF') super(MultiTrackWorker, self).__init__(send, debug) @@ -94,8 +83,7 @@ def pipeline(self, imgs, timestamp): track_res = self.run_trackers(dets, imgs) results = { 'tracks': track_res, - 'dets': dets, - } + 'dets': dets, } return results def run_trackers(self, dets, imgs): @@ -110,21 +98,18 @@ def run_trackers(self, dets, imgs): if tracks.shape[0] != 0: xyxys = tracks[:, 0:4] xywhn = xyxys.copy() - xywhn[:, 0] = np.sum(xyxys[:, [0,2]], axis=1) / 2 / img_w - xywhn[:, 1] = np.sum(xyxys[:, [1,3]], axis=1) / 2 / img_h + xywhn[:, 0] = np.sum(xyxys[:, [0, 2]], axis=1) / 2 / img_w + xywhn[:, 1] = np.sum(xyxys[:, [1, 3]], axis=1) / 2 / img_h xywhn[:, 2] = (xyxys[:, 2] - xyxys[:, 0]) / img_w xywhn[:, 3] = (xyxys[:, 3] - xyxys[:, 1]) / img_h - tracks[:, 0:4] = xywhn # [xcn, ycn, wn, hn, id, conf, class, index (from detections)] + tracks[:, 0:4] = xywhn # [xcn, ycn, wn, hn, id, conf, class, index (from detections)] track_res[cam_id] = (tracks, i) - + return track_res - def log_debug(self, timestamp, results, imgs): # TODO rewrite - timestamp_str = timestamp.isoformat("T", "milliseconds").replace( - ":", "_" - ).replace('.', '_') + timestamp_str = timestamp.isoformat('T', 'milliseconds').replace(':', '_').replace('.', '_') tracks = results['tracks'] dets = results['dets'] for i, (tracks, cam_idx) in enumerate(tracks): @@ -132,60 +117,40 @@ def log_debug(self, timestamp, results, imgs): img_h, img_w, _ = img.shape try: cv2.imwrite( - str( - self.save_img_path - / "images" - / f"{self.cams_cfg.cam_ids[cam_idx]}_{timestamp_str}.jpg" - ), + str(self.save_img_path / 'images' / f'{self.cams_cfg.cam_ids[cam_idx]}_{timestamp_str}.jpg'), img, ) except Exception as e: logger.critical(img.shape, e) labels_str = [] - for track in tracks: # [xc, yc, wn, hn, id, conf, class, index (from detections)] + for track in tracks: # [xc, yc, wn, hn, id, conf, class, index (from detections)] xcn, ycn, wn, hn, id_obj, conf, label, ind = track - crop = img[ - int((ycn - hn)*img_h):int((ycn + hn)*img_h), - int((xcn - wn)*img_w):int((xcn + wn)*img_w), - ] + crop = img[int((ycn - hn) * img_h):int((ycn + hn) * img_h), + int((xcn - wn) * img_w):int((xcn + wn) * img_w), ] labels_str.append(f'{int(label)} {xcn} {ycn} {wn} {hn} {conf}\n') - Path(self.save_img_path - / 'crops' - / f'{self.cams_cfg.cam_ids[cam_idx]}' - / f'{self.names[label]}_{id_obj}').mkdir(exist_ok=True, parents=True) - try: + Path(self.save_img_path / 'crops' / f'{self.cams_cfg.cam_ids[cam_idx]}' / + f'{self.names[label]}_{id_obj}').mkdir(exist_ok=True, parents=True) + try: cv2.imwrite( - str( - self.save_img_path - / 'crops' - / f'{self.cams_cfg.cam_ids[cam_idx]}' - / f'{self.names[label]}_{id_obj}' - / f'{timestamp_str}.jpg' - ), - crop - ) + str(self.save_img_path / 'crops' / f'{self.cams_cfg.cam_ids[cam_idx]}' / + f'{self.names[label]}_{id_obj}' / f'{timestamp_str}.jpg'), crop) except Exception as e: logger.warning(crop.shape, track, img.shape, e) - with ( - self.save_img_path - / "labels" - / f"{self.cams_cfg.cam_ids[cam_idx]}_{timestamp_str}.txt" - ).open("w") as f: + with (self.save_img_path / 'labels' / + f'{self.cams_cfg.cam_ids[cam_idx]}_{timestamp_str}.txt').open('w') as f: f.writelines(labels_str) def main(): parser = argparse.ArgumentParser() - parser.add_argument("-cfg", type=str, help="Inference config path") - parser.add_argument("-cam", type=str, help="Camera config path") + parser.add_argument('-cfg', type=str, help='Inference config path') + parser.add_argument('-cam', type=str, help='Camera config path') + parser.add_argument('-log', '--log_path', type=str, help='Logging path') parser.add_argument( - "-log", "--log_path", type=str, help="Logging path" - ) - parser.add_argument( - "-d", - "--debug", - action="store_true", - help="Debug mode, that save images and predictions", + '-d', + '--debug', + action='store_true', + help='Debug mode, that save images and predictions', ) args = parser.parse_args() cfg_path = args.cfg @@ -196,18 +161,17 @@ def main(): logging.basicConfig( level=logging.DEBUG, filename=f"{log_path}/{TIMEZONE.localize(datetime.now()).isoformat('T', 'seconds').replace(':', '_')}_logs.log", - filemode="w", - format="%(asctime)s %(levelname)s %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", + filemode='w', + format='%(asctime)s %(levelname)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', ) - with open(cfg_path, "r") as f: + with open(cfg_path, 'r') as f: cfg = yaml.safe_load(f) - with open(cams, "r") as f: + with open(cams, 'r') as f: cams_cfg = yaml.safe_load(f) worker = MultiTrackWorker(cfg, cams_cfg, debug) worker.run() - + if __name__ == '__main__': main() - \ No newline at end of file diff --git a/stream_tools/pipeline/pipeline_base.py b/stream_tools/pipeline/pipeline_base.py index ded9ce0..5173a88 100644 --- a/stream_tools/pipeline/pipeline_base.py +++ b/stream_tools/pipeline/pipeline_base.py @@ -1,19 +1,20 @@ import argparse -from pathlib import Path -from datetime import datetime -import pytz import logging import warnings +from datetime import datetime +from pathlib import Path from queue import Empty, Queue from threading import Event, Thread -warnings.filterwarnings("ignore") -TIMEZONE = pytz.timezone( - "Europe/Moscow" -) # UTC, Asia/Shanghai, Europe/Berlin +import pytz + +warnings.filterwarnings('ignore') + +TIMEZONE = pytz.timezone('Europe/Moscow') # UTC, Asia/Shanghai, Europe/Berlin logger = logging.getLogger(__name__) + def timetz(*args): return datetime.now(TIMEZONE).timetuple() @@ -21,9 +22,7 @@ def timetz(*args): class BaseWorker: _TIMEOUT = 2 - def __init__(self, - send: bool = False, - debug: bool = False): + def __init__(self, send: bool = False, debug: bool = False): # Init separate process self.queue = Queue(maxsize=30) self.done = Event() @@ -74,16 +73,17 @@ def run_on_images(self, imgs): self.log_debug(timestamp, results, imgs) if self.send: self.send_results(timestamp, results, imgs) - + except Exception: - import ipdb; ipdb.set_trace() - + import ipdb + ipdb.set_trace() + def pipeline(self, imgs): raise NotImplementedError def log_debug(self, timestamp, results, imgs): raise NotImplementedError - + def send_results(timestamp, results, imgs): raise NotImplementedError @@ -91,19 +91,17 @@ def send_results(timestamp, results, imgs): def base_main(): parser = argparse.ArgumentParser() parser.add_argument( - "-s", - "--send", - action="store_true", - help="Send results via API or put them into DB", + '-s', + '--send', + action='store_true', + help='Send results via API or put them into DB', ) + parser.add_argument('-log', '--log_path', type=str, help='Logging path') parser.add_argument( - "-log", "--log_path", type=str, help="Logging path" - ) - parser.add_argument( - "-d", - "--debug", - action="store_true", - help="Debug mode, that save images and predictions", + '-d', + '--debug', + action='store_true', + help='Debug mode, that save images and predictions', ) args = parser.parse_args() send = args.send @@ -113,14 +111,13 @@ def base_main(): logging.basicConfig( level=logging.DEBUG, filename=f"{log_path}/{TIMEZONE.localize(datetime.now()).isoformat('T', 'seconds').replace(':', '_')}_logs.log", - filemode="w", - format="%(asctime)s %(levelname)s %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", + filemode='w', + format='%(asctime)s %(levelname)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', ) worker = BaseWorker(send, debug) worker.run() - + if __name__ == '__main__': base_main() - \ No newline at end of file From 0e4bb327b9ba5e51b57c0abcba2cd100f1ff7c02 Mon Sep 17 00:00:00 2001 From: ilyabasharov Date: Fri, 19 Apr 2024 13:51:01 +0300 Subject: [PATCH 2/7] [Fix] precommit --- .pre-commit-config.yaml | 6 +++++- stream_tools/action_tracker/__init__.py | 1 - stream_tools/action_tracker/movement_tracker.py | 3 --- stream_tools/config/__init__.py | 2 -- stream_tools/config/base_config.py | 7 ------- stream_tools/config/ivi_config.py | 4 ---- stream_tools/dataloader/__init__.py | 3 --- stream_tools/dataloader/base.py | 4 ---- stream_tools/dataloader/opencv_loader.py | 1 - stream_tools/model/__init__.py | 2 -- stream_tools/model/detector.py | 1 - stream_tools/pipeline/__init__.py | 2 -- stream_tools/pipeline/mutlitrack.py | 3 --- stream_tools/pipeline/pipeline_base.py | 4 ---- 14 files changed, 5 insertions(+), 38 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b14540d..af0d2c5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,6 +3,11 @@ repos: rev: 7.0.0 hooks: - id: flake8 + - repo: https://github.com/PyCQA/autoflake + rev: v2.2.1 + hooks: + - id: autoflake + args: [--remove-all-unused-imports, --in-place] - repo: https://github.com/PyCQA/isort.git rev: 5.13.2 hooks: @@ -18,7 +23,6 @@ repos: - id: trailing-whitespace - id: check-yaml - id: end-of-file-fixer - - id: requirements-txt-fixer - id: double-quote-string-fixer - id: check-merge-conflict - id: fix-encoding-pragma diff --git a/stream_tools/action_tracker/__init__.py b/stream_tools/action_tracker/__init__.py index 5f50331..e69de29 100644 --- a/stream_tools/action_tracker/__init__.py +++ b/stream_tools/action_tracker/__init__.py @@ -1 +0,0 @@ -from stream_tools.action_tracker.movement_tracker import MovementTracker, MultiCameraMovementTracker diff --git a/stream_tools/action_tracker/movement_tracker.py b/stream_tools/action_tracker/movement_tracker.py index d7551b8..8251f5e 100644 --- a/stream_tools/action_tracker/movement_tracker.py +++ b/stream_tools/action_tracker/movement_tracker.py @@ -1,6 +1,3 @@ -from datetime import datetime - - class MovementTracker: def __init__(self, object_id, timestamp, movement_threshold=5, log_file='movement_log.txt', disappear_time=10): diff --git a/stream_tools/config/__init__.py b/stream_tools/config/__init__.py index fad2702..e69de29 100644 --- a/stream_tools/config/__init__.py +++ b/stream_tools/config/__init__.py @@ -1,2 +0,0 @@ -from stream_tools.config.base_config import BaseConfig -from stream_tools.config.ivi_config import IvideonConfig diff --git a/stream_tools/config/base_config.py b/stream_tools/config/base_config.py index 6754eb6..c9b1c5c 100644 --- a/stream_tools/config/base_config.py +++ b/stream_tools/config/base_config.py @@ -1,10 +1,3 @@ -import json -import logging - -import pandas as pd -import requests as re - - class BaseConfig(): def __init__(self, cfg): diff --git a/stream_tools/config/ivi_config.py b/stream_tools/config/ivi_config.py index 542de4d..5417664 100644 --- a/stream_tools/config/ivi_config.py +++ b/stream_tools/config/ivi_config.py @@ -1,8 +1,4 @@ -import json -import logging - import pandas as pd -import requests as re from stream_tools.config.base_config import BaseConfig diff --git a/stream_tools/dataloader/__init__.py b/stream_tools/dataloader/__init__.py index 9023e44..e69de29 100644 --- a/stream_tools/dataloader/__init__.py +++ b/stream_tools/dataloader/__init__.py @@ -1,3 +0,0 @@ -from stream_tools.dataloader.base import BaseStreamLoader -from stream_tools.dataloader.ivideon_loader import GPUIvideonStreamLoader, IvideonStreamLoader -from stream_tools.dataloader.opencv_loader import OpenCVLoader diff --git a/stream_tools/dataloader/base.py b/stream_tools/dataloader/base.py index e8decae..6c613eb 100644 --- a/stream_tools/dataloader/base.py +++ b/stream_tools/dataloader/base.py @@ -1,7 +1,4 @@ -#!/usr/bin/env python - import logging -import os import time from threading import Thread from typing import Union @@ -89,7 +86,6 @@ def add_source(self, source: str): def close_source(self, source: Union[str, int]): # TODO check source and finish func - thread = self.threads[source] self.threads = self.threads[:source] + self.threads[source + 1:] self.imgs = self.imgs[:source] + self.imgs[source + 1:] self.imgs = self.imgs[:source] + self.imgs[source + 1:] diff --git a/stream_tools/dataloader/opencv_loader.py b/stream_tools/dataloader/opencv_loader.py index 6bd91b0..5af3dc0 100644 --- a/stream_tools/dataloader/opencv_loader.py +++ b/stream_tools/dataloader/opencv_loader.py @@ -1,5 +1,4 @@ import logging -import os import time from collections import deque diff --git a/stream_tools/model/__init__.py b/stream_tools/model/__init__.py index 34a2102..e69de29 100644 --- a/stream_tools/model/__init__.py +++ b/stream_tools/model/__init__.py @@ -1,2 +0,0 @@ -from stream_tools.model.classifier import YoloClassifier -from stream_tools.model.detector import Detector diff --git a/stream_tools/model/detector.py b/stream_tools/model/detector.py index 4f643ca..3fc42f1 100644 --- a/stream_tools/model/detector.py +++ b/stream_tools/model/detector.py @@ -2,7 +2,6 @@ from time import perf_counter_ns from typing import Any -import cv2 import numpy as np import torch from ultralytics import YOLO diff --git a/stream_tools/pipeline/__init__.py b/stream_tools/pipeline/__init__.py index 863dbe0..e69de29 100644 --- a/stream_tools/pipeline/__init__.py +++ b/stream_tools/pipeline/__init__.py @@ -1,2 +0,0 @@ -from stream_tools.pipeline.mutlitrack import MultiTrackWorker -from stream_tools.pipeline.pipeline_base import BaseWorker diff --git a/stream_tools/pipeline/mutlitrack.py b/stream_tools/pipeline/mutlitrack.py index bd2deaa..60a5402 100644 --- a/stream_tools/pipeline/mutlitrack.py +++ b/stream_tools/pipeline/mutlitrack.py @@ -4,9 +4,6 @@ from collections import defaultdict from datetime import datetime from pathlib import Path -from queue import Empty, Queue -from threading import Event, Thread -from time import perf_counter_ns import cv2 import numpy as np diff --git a/stream_tools/pipeline/pipeline_base.py b/stream_tools/pipeline/pipeline_base.py index 5173a88..cb2e860 100644 --- a/stream_tools/pipeline/pipeline_base.py +++ b/stream_tools/pipeline/pipeline_base.py @@ -31,13 +31,9 @@ def __init__(self, send: bool = False, debug: bool = False): self.send = send # Streams - pass # Models - pass # Trackers - pass # Debug - pass self.pool.start() From a9ee7adcb63545412d83cd10686137a13d19454e Mon Sep 17 00:00:00 2001 From: ilyabasharov Date: Sat, 4 May 2024 14:56:04 +0300 Subject: [PATCH 3/7] [Enhance] Add gpu loader --- docker-compose.yml | 2 + pyproject.toml | 2 + stream_tools/dataloader/__init__.py | 9 ++ stream_tools/dataloader/base.py | 52 ++++++++---- stream_tools/dataloader/opencv_loader.py | 101 ++++++++++++----------- stream_tools/dataloader/torio_loader.py | 89 ++++++++++++++++++++ stream_tools/utils/__init__.py | 5 ++ stream_tools/utils/color.py | 22 +++++ 8 files changed, 215 insertions(+), 67 deletions(-) create mode 100644 stream_tools/dataloader/torio_loader.py create mode 100644 stream_tools/utils/__init__.py create mode 100644 stream_tools/utils/color.py diff --git a/docker-compose.yml b/docker-compose.yml index ccb6979..0bda8db 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,8 @@ services: tty: true # docker run -t container_name: stream_tools shm_size: '2gb' + volumes: + - ./:/usr/src/app deploy: resources: limits: diff --git a/pyproject.toml b/pyproject.toml index e560e24..4ce2272 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,8 @@ dependencies = [ "opencv-python-headless>=4.6.0", "numpy>=1.26.4", "numba>=0.59.1", + "pytz>=2024.1", + "PyYAML>=6.0.1", ] # Optional dependencies ------------------------------------------------------------------------------------------------ diff --git a/stream_tools/dataloader/__init__.py b/stream_tools/dataloader/__init__.py index e69de29..71d4600 100644 --- a/stream_tools/dataloader/__init__.py +++ b/stream_tools/dataloader/__init__.py @@ -0,0 +1,9 @@ +from .base import BaseStreamLoader +from .opencv_loader import OpenCVLoader +from .torio_loader import TorioLoader + +__all__ = [ + 'BaseStreamLoader', + 'OpenCVLoader', + 'TorioLoader', +] \ No newline at end of file diff --git a/stream_tools/dataloader/base.py b/stream_tools/dataloader/base.py index 6c613eb..1c370f4 100644 --- a/stream_tools/dataloader/base.py +++ b/stream_tools/dataloader/base.py @@ -3,8 +3,6 @@ from threading import Thread from typing import Union -import cv2 - logger = logging.getLogger(__name__) @@ -33,7 +31,7 @@ def __init__(self, # Save arguments self.buffer_length = buffer_length # max buffer length self.vid_fps = vid_fps - self.max_first_attempts_to_reconnect = (max_first_attempts_to_reconnect) + self.max_first_attempts_to_reconnect = max_first_attempts_to_reconnect self.first_wait_time = first_wait_time self.second_wait_time = second_wait_time self.running = True # running flag for Thread @@ -45,12 +43,22 @@ def __init__(self, self.fps = [float('inf')] * self.n # fps of each stream self.frames = [0] * self.n # number of frames in each stream self.threads = [None] * self.n # buffer stored streams - self.shape = [[] for _ in range(self.n)] + self.shape = [[] for _ in range(self.n)] # shape of image frames self.caps = [None] * self.n # video capture objects - self.started = [0] * self.n + self.started = [True] * self.n # stream started seccessfully or not + self.attempts = [0] * self.n # number of attempts for connect + + def check_attempts(self, i: int, skip_first: bool=False) -> None: + """Sleep depends on number of attempts.""" + if self.attempts[i] == 0 and skip_first: + return + elif self.attempts[i] < self.max_first_attempts_to_reconnect: + time.sleep(self.first_wait_time) + else: + time.sleep(self.second_wait_time) def initialize(self): - # Create a thread for each source and start it + """Create a thread for each source and start it.""" for i, s in enumerate(self.sources): # index, source # Start thread to read frames from video stream self.threads[i] = Thread( @@ -59,23 +67,27 @@ def initialize(self): daemon=True, ) self.threads[i].start() - self.new_fps = (min(self.fps) if isinstance(self.vid_fps, str) and self.vid_fps == 'auto' else self.vid_fps - ) # fps alignment + self.new_fps = ( + min(self.fps) + if isinstance(self.vid_fps, str) and self.vid_fps == 'auto' + else self.vid_fps + ) # fps alignment logger.info('') # newline @property def bs(self): + """Returns number of streams.""" return self.__len__() - def add_source(self, source: str): + def add_source(self, source: str) -> int: + """Add new source to process.""" i = len(self.threads) self.imgs.append(None) self.fps.append(float('inf')) self.frames.append(0) - # self.threads.append(None) self.shape.append([]) self.caps.append(None) - self.started.append(0) + self.started.append(False) self.threads.append(Thread( target=self.update, args=([i, source]), @@ -84,7 +96,8 @@ def add_source(self, source: str): self.threads[i].start() return i - def close_source(self, source: Union[str, int]): + def close_source(self, source: int) -> None: + """Delete source from processing.""" # TODO check source and finish func self.threads = self.threads[:source] + self.threads[source + 1:] self.imgs = self.imgs[:source] + self.imgs[source + 1:] @@ -93,8 +106,9 @@ def close_source(self, source: Union[str, int]): if self.threads[source].is_alive(): self.threads[source].join(timeout=5) # Add timeout - def update(self, i, source): - raise NotImplementedError('Implement update function in stream loader class') + def update(self, i: int, source: str) -> None: + """Read stream `i` frames in daemon thread.""" + pass def close(self): """Close stream loader and release resources.""" @@ -102,12 +116,11 @@ def close(self): for thread in self.threads: if thread.is_alive(): thread.join(timeout=5) # Add timeout - for (cap) in (self.caps): # Iterate through the stored VideoCapture objects + for cap in self.caps: # Iterate through the stored VideoCapture objects try: cap.release() # release video capture except Exception as e: logger.warning(f'WARNING ⚠️ Could not release VideoCapture object: {e}') - # cv2.destroyAllWindows() def __iter__(self): """Iterates through image feed and re-opens unresponsive streams.""" @@ -125,10 +138,9 @@ def __next__(self): for i, x in enumerate(self.imgs): # If image is not available if not x: - if not self.threads[i].is_alive() or cv2.waitKey(1) == ord('q'): # q to quit + if not self.threads[i].is_alive(): self.close() raise StopIteration - # logger.warning(f"WARNING ⚠️ Waiting for stream {i}") im = None # Get the last element from buffer else: @@ -142,3 +154,7 @@ def __next__(self): def __len__(self): """Return the length of the sources object.""" return len(self.sources) # 1E12 frames = 32 streams at 30 FPS for 30 years + + def init_stream(self, stream: str, i: int, device: str='cpu') -> bool: + """Init stream and fill the main info about it.""" + pass diff --git a/stream_tools/dataloader/opencv_loader.py b/stream_tools/dataloader/opencv_loader.py index 5af3dc0..f352b45 100644 --- a/stream_tools/dataloader/opencv_loader.py +++ b/stream_tools/dataloader/opencv_loader.py @@ -1,5 +1,4 @@ import logging -import time from collections import deque import cv2 @@ -11,70 +10,74 @@ class OpenCVLoader(BaseStreamLoader): - def update(self, i, source): - """Read stream `i` frames in daemon thread.""" - # os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "threads;2" - attempt = 0 - st = f'{i + 1}/{self.n}: {source}... ' - w, h = 0, 0 - while self.caps[i] is None: - if attempt == 0: - pass - elif attempt < self.max_first_attempts_to_reconnect: - time.sleep(self.first_wait_time) - else: - time.sleep(self.second_wait_time) - try: - cap = cv2.VideoCapture(source) - except Exception as ex: - logger.warning(f'Video stream {i} is unresponsive on start: {ex}, reconnecting...') - attempt += 1 - continue - self.caps[i] = cap - success, im = self.caps[i].read() # guarantee first frame - if not success or im is None: - logger.warning(f'{st}Failed to read images from {source}, reconnecting...') - attempt += 1 - self.caps[i] = None - continue - w = int(self.caps[i].get(cv2.CAP_PROP_FRAME_WIDTH)) - h = int(self.caps[i].get(cv2.CAP_PROP_FRAME_HEIGHT)) - if w == 0 or h == 0: - attempt += 1 - self.caps[i] = None - continue - self.fps[i] = self.caps[i].get(cv2.CAP_PROP_FPS) + def init_stream(self, stream: str, i: int, device: str='cpu') -> bool: + """Init stream and fill the main info about it.""" + assert 'cpu' in device, f'Only cpu device now supported, got {device}.' + success, im = False, None + try: + cap = cv2.VideoCapture(stream) + success, im = cap.read() # guarantee first frame + except Exception as ex: + logger.warning(f'Video stream {i} is unresponsive on start: {ex}, reconnecting...') + self.attempts[i] += 1 + return success + + if not success or im is None: + logger.warning(f'Failed to read images from {stream}, reconnecting...') + self.attempts[i] += 1 + return success + + w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + if w == 0 or h == 0: + logger.warning(f'Failed to read shape of images from {stream}, reconnecting...') + self.attempts[i] += 1 + return success + + success = True + self.started[i] = True + self.caps[i] = cap + self.fps[i] = cap.get(cv2.CAP_PROP_FPS) self.frames[i] = float('inf') self.shape[i] = [w, h] buf = deque(maxlen=self.buffer_length) buf.append(im) self.imgs[i] = buf - logger.info(f'{st}Success ✅ ({self.frames[i]} frames of shape {w}x{h} at {self.fps[i]:.2f} FPS)') - attempt = 0 + + return success + + def update(self, i: int, source: str) -> None: + """Read stream `i` frames in daemon thread.""" + self.attempts[i] = 0 + st = f'{i + 1}/{self.n}: {source}... ' + + # main init stream loop (can be infinity) + while not self.init_stream(source, i): + self.check_attempts(i, skip_first=True) + + logger.info( + f'{st}Success ✅ ({self.frames[i]} frames of shape {self.shape[i][0]}x{self.shape[i][1]} at {self.fps[i]:.2f} FPS)' + ) + self.attempts[i] = 0 n, f = 0, self.frames[i] # frame number, frame array - self.started[i] = 1 + cap = self.caps[i] while self.running and n < (f - 1): # and cap.isOpened() - success = (cap.grab()) # .read() = .grab() followed by .retrieve() + success = cap.grab() # .read() = .grab() followed by .retrieve() im = self.imgs[i][-1] # Default to last valid image if not success: logger.warning(f'WARNING ⚠️ Video stream {i} unresponsive, please check your IP camera connection.') - if attempt < self.max_first_attempts_to_reconnect: - time.sleep(self.first_wait_time) - else: - time.sleep(self.second_wait_time) - attempt += 1 + self.check_attempts(i, skip_first=False) + self.attempts[i] += 1 cap.open(source) # re-open stream if signal was lost else: success, im = cap.retrieve() - attempt = 0 + self.attempts[i] = 0 if not success: im = None logger.warning(f'WARNING ⚠️ Cannot decode image from video stream {i}. Unknown error.') - if attempt < self.max_first_attempts_to_reconnect: - time.sleep(self.first_wait_time) - else: - time.sleep(self.second_wait_time) - attempt += 1 + self.check_attempts(i, skip_first=False) + self.attempts[i] += 1 cap.open(source) self.imgs[i].append(im) n += 1 diff --git a/stream_tools/dataloader/torio_loader.py b/stream_tools/dataloader/torio_loader.py new file mode 100644 index 0000000..4eb3a52 --- /dev/null +++ b/stream_tools/dataloader/torio_loader.py @@ -0,0 +1,89 @@ +import logging +import time + +from collections import deque +import torch +from torchaudio.io import StreamReader + +logger = logging.getLogger(__name__) + +from stream_tools.dataloader import BaseStreamLoader +from stream_tools.utils import yuv_to_rgb + + +class TorioLoader(BaseStreamLoader): + + def init_stream(self, stream: str, i: int, device: str='cuda:0') -> bool: + """Init stream and fill the main info about it.""" + assert 'cuda' in device, f'Only cuda device now supported, got {device}.' + success, im = False, None + try: + cap = StreamReader(stream) + cap.add_video_stream( + frames_per_chunk=1, + buffer_chunk_size=1, + decoder='h264_cuvid', + decoder_option={"threads": "0", "gpu": "0"}, + hw_accel=device, + ) + success = not cap.fill_buffer() + (im, ) = cap.pop_chunks() + except Exception as ex: + logger.warning(f'Video stream {i} is unresponsive on start: {ex}, reconnecting...') + self.attempts[i] += 1 + return success + + if not success or not torch.is_tensor(im): + logger.warning(f'Failed to read images from {stream}, reconnecting...') + self.attempts[i] += 1 + return success + + _, _, h, w = im.shape + + if w == 0 or h == 0: + logger.warning(f'Failed to read shape of images from {stream}, reconnecting...') + self.attempts[i] += 1 + return success + + success = True + self.started[i] = True + self.caps[i] = cap + self.fps[i] = cap.get_src_stream_info(0).frame_rate + self.frames[i] = float('inf') + self.shape[i] = [w, h] + buf = deque(maxlen=self.buffer_length) + buf.append(yuv_to_rgb(im)) + self.imgs[i] = buf + + return success + + def update(self, i: int, source: str) -> None: + """Read stream `i` frames in daemon thread.""" + self.attempts[i] = 0 + st = f'{i + 1}/{self.n}: {source}... ' + + # main init stream loop (can be infinity) + while not self.init_stream(source, i): + self.check_attempts(i, skip_first=True) + + logger.info( + f'{st}Success ✅ ({self.frames[i]} frames of shape {self.shape[i][0]}x{self.shape[i][1]} at {self.fps[i]:.2f} FPS)' + ) + self.attempts[i] = 0 + n, f = 0, self.frames[i] # frame number, frame array + cap = self.caps[i] + while self.running and n < (f - 1): # and cap.isOpened() + success = not cap.fill_buffer() # .stream() = .fill_buffer() followed by .pop_chunks() + im = self.imgs[i][-1] # Default to last valid image + if not success: + logger.warning(f'WARNING ⚠️ Video stream {i} unresponsive, please check your IP camera connection.') + self.check_attempts(i, skip_first=False) + self.attempts[i] += 1 + # TODO reopen video stream for torio loader + # cap.open(source) # re-open stream if signal was lost + else: + (im, ) = cap.pop_chunks() + self.imgs[i].append(yuv_to_rgb(im)) + n += 1 + else: + logger.info(f'End of stream {i}.') diff --git a/stream_tools/utils/__init__.py b/stream_tools/utils/__init__.py new file mode 100644 index 0000000..f520c24 --- /dev/null +++ b/stream_tools/utils/__init__.py @@ -0,0 +1,5 @@ +from .color import yuv_to_rgb + +__all__ = [ + "yuv_to_rgb", +] \ No newline at end of file diff --git a/stream_tools/utils/color.py b/stream_tools/utils/color.py new file mode 100644 index 0000000..5d60d4f --- /dev/null +++ b/stream_tools/utils/color.py @@ -0,0 +1,22 @@ +import torch +from torch import Tensor + + +@torch.compile(dynamic=True, backend="eager") +def yuv_to_rgb(frames: Tensor) -> Tensor: + """Converts YUV BCHW dims torch tensor to RGB BCHW dims torch tensor + + :param frames: YUV BCHW dims torch tensor + :return: RGB BCHW dims torch tensor + """ + frames = frames.to(torch.float32).div_(255) + y = frames[..., 0, :, :] + u = frames[..., 1, :, :] - 0.5 + v = frames[..., 2, :, :] - 0.5 + + r = y + 1.14 * v + g = y + -0.396 * u - 0.581 * v + b = y + 2.029 * u + + rgb = torch.stack([r, g, b], 1).clamp_(0, 1) + return rgb From 75372f3fd9958a328fde1c368c8c62cc7a21495a Mon Sep 17 00:00:00 2001 From: ilyabasharov Date: Mon, 6 May 2024 15:24:49 +0300 Subject: [PATCH 4/7] [Enhance] Add wait for initialisation --- .pre-commit-config.yaml | 4 ++ Dockerfile.cuda | 2 +- stream_tools/dataloader/__init__.py | 3 +- stream_tools/dataloader/base.py | 35 ++++++++------ stream_tools/dataloader/opencv_loader.py | 2 +- stream_tools/dataloader/torio_loader.py | 58 ++++++++++++------------ stream_tools/model/classifier.py | 2 +- stream_tools/pipeline/mutlitrack.py | 11 ++--- stream_tools/pipeline/pipeline_base.py | 2 +- stream_tools/utils/__init__.py | 3 +- stream_tools/utils/color.py | 2 +- 11 files changed, 66 insertions(+), 58 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index af0d2c5..e1d4efe 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,3 +33,7 @@ repos: rev: v2.2.6 hooks: - id: codespell + exclude: > + (?x)^( + .*\.toml + )$ diff --git a/Dockerfile.cuda b/Dockerfile.cuda index 4e46f98..2c1b444 100644 --- a/Dockerfile.cuda +++ b/Dockerfile.cuda @@ -12,7 +12,7 @@ ARG OS_VERSION=22.04 # Define base image. FROM nvidia/cuda:${CUDA_VERSION}-cudnn8-devel-ubuntu${OS_VERSION} AS base -# Dublicate args because of the visibility zone +# Duplicate args because of the visibility zone # https://docs.docker.com/engine/reference/builder/#understand-how-arg-and-from-interact ARG CUDA_VERSION ARG OS_VERSION diff --git a/stream_tools/dataloader/__init__.py b/stream_tools/dataloader/__init__.py index 71d4600..6197fc5 100644 --- a/stream_tools/dataloader/__init__.py +++ b/stream_tools/dataloader/__init__.py @@ -5,5 +5,4 @@ __all__ = [ 'BaseStreamLoader', 'OpenCVLoader', - 'TorioLoader', -] \ No newline at end of file + 'TorioLoader', ] diff --git a/stream_tools/dataloader/base.py b/stream_tools/dataloader/base.py index 1c370f4..12d88c0 100644 --- a/stream_tools/dataloader/base.py +++ b/stream_tools/dataloader/base.py @@ -1,6 +1,6 @@ import logging import time -from threading import Thread +from threading import Event, Thread from typing import Union logger = logging.getLogger(__name__) @@ -38,19 +38,20 @@ def __init__(self, self.sources = sources self.n = len(sources) self.kwargs = kwargs - # Initilize attributes + # Initialize attributes self.imgs = [None] * self.n # buffer with images self.fps = [float('inf')] * self.n # fps of each stream self.frames = [0] * self.n # number of frames in each stream self.threads = [None] * self.n # buffer stored streams + self.events = [None] * self.n # buffer stored events for threads self.shape = [[] for _ in range(self.n)] # shape of image frames self.caps = [None] * self.n # video capture objects - self.started = [True] * self.n # stream started seccessfully or not + self.started = [True] * self.n # stream started successfully or not self.attempts = [0] * self.n # number of attempts for connect - def check_attempts(self, i: int, skip_first: bool=False) -> None: + def check_attempts(self, i: int, skip_first: bool = False) -> None: """Sleep depends on number of attempts.""" - if self.attempts[i] == 0 and skip_first: + if skip_first and self.attempts[i] == 0: return elif self.attempts[i] < self.max_first_attempts_to_reconnect: time.sleep(self.first_wait_time) @@ -60,18 +61,19 @@ def check_attempts(self, i: int, skip_first: bool=False) -> None: def initialize(self): """Create a thread for each source and start it.""" for i, s in enumerate(self.sources): # index, source + self.events[i] = Event() # Start thread to read frames from video stream self.threads[i] = Thread( - target=self.update, + target=self.__update, args=([i, s]), daemon=True, ) self.threads[i].start() - self.new_fps = ( - min(self.fps) - if isinstance(self.vid_fps, str) and self.vid_fps == 'auto' - else self.vid_fps - ) # fps alignment + # Wait for the initialization event to be set + self.events[i].wait() + + self.new_fps = (min(self.fps) if isinstance(self.vid_fps, str) and self.vid_fps == 'auto' else self.vid_fps + ) # fps alignment logger.info('') # newline @property @@ -89,7 +91,7 @@ def add_source(self, source: str) -> int: self.caps.append(None) self.started.append(False) self.threads.append(Thread( - target=self.update, + target=self.__update, args=([i, source]), daemon=True, )) @@ -106,9 +108,13 @@ def close_source(self, source: int) -> None: if self.threads[source].is_alive(): self.threads[source].join(timeout=5) # Add timeout + def __update(self, i: int, source: str) -> None: + """System function that calls `update` for each thread.""" + self.update(i, source) + self.events[i].set() # Signal that this thread has finished initializing + def update(self, i: int, source: str) -> None: """Read stream `i` frames in daemon thread.""" - pass def close(self): """Close stream loader and release resources.""" @@ -155,6 +161,5 @@ def __len__(self): """Return the length of the sources object.""" return len(self.sources) # 1E12 frames = 32 streams at 30 FPS for 30 years - def init_stream(self, stream: str, i: int, device: str='cpu') -> bool: + def init_stream(self, stream: str, i: int, device: str = 'cpu') -> bool: """Init stream and fill the main info about it.""" - pass diff --git a/stream_tools/dataloader/opencv_loader.py b/stream_tools/dataloader/opencv_loader.py index f352b45..9888540 100644 --- a/stream_tools/dataloader/opencv_loader.py +++ b/stream_tools/dataloader/opencv_loader.py @@ -10,7 +10,7 @@ class OpenCVLoader(BaseStreamLoader): - def init_stream(self, stream: str, i: int, device: str='cpu') -> bool: + def init_stream(self, stream: str, i: int, device: str = 'cpu') -> bool: """Init stream and fill the main info about it.""" assert 'cpu' in device, f'Only cpu device now supported, got {device}.' success, im = False, None diff --git a/stream_tools/dataloader/torio_loader.py b/stream_tools/dataloader/torio_loader.py index 4eb3a52..601d2e0 100644 --- a/stream_tools/dataloader/torio_loader.py +++ b/stream_tools/dataloader/torio_loader.py @@ -1,9 +1,9 @@ import logging -import time - from collections import deque + import torch from torchaudio.io import StreamReader +from torchaudio.utils import ffmpeg_utils logger = logging.getLogger(__name__) @@ -13,19 +13,31 @@ class TorioLoader(BaseStreamLoader): - def init_stream(self, stream: str, i: int, device: str='cuda:0') -> bool: + def init_stream( + self, + stream: str, + i: int, + device: str = 'cuda:0', + decoder: str = 'h264', + ) -> bool: """Init stream and fill the main info about it.""" - assert 'cuda' in device, f'Only cuda device now supported, got {device}.' success, im = False, None + decode_config = { + 'frames_per_chunk': 1, + 'buffer_chunk_size': 1, + 'decoder': decoder, + 'decoder_option': { + 'threads': '0', }, } + if 'cuda' in device: + decode_config['decoder'] = f'{decoder}_cuvid' + decode_config['hw_accel'] = device + decode_config['decoder_option']['gpu'] = '0' + + assert decode_config['decoder'] in ffmpeg_utils.get_video_decoders().keys(), \ + f'Decoder {decoder} is not supported. Please check available decoder.' try: cap = StreamReader(stream) - cap.add_video_stream( - frames_per_chunk=1, - buffer_chunk_size=1, - decoder='h264_cuvid', - decoder_option={"threads": "0", "gpu": "0"}, - hw_accel=device, - ) + cap.add_video_stream(**decode_config) success = not cap.fill_buffer() (im, ) = cap.pop_chunks() except Exception as ex: @@ -37,14 +49,13 @@ def init_stream(self, stream: str, i: int, device: str='cuda:0') -> bool: logger.warning(f'Failed to read images from {stream}, reconnecting...') self.attempts[i] += 1 return success - + _, _, h, w = im.shape if w == 0 or h == 0: logger.warning(f'Failed to read shape of images from {stream}, reconnecting...') self.attempts[i] += 1 return success - success = True self.started[i] = True self.caps[i] = cap @@ -54,9 +65,8 @@ def init_stream(self, stream: str, i: int, device: str='cuda:0') -> bool: buf = deque(maxlen=self.buffer_length) buf.append(yuv_to_rgb(im)) self.imgs[i] = buf - return success - + def update(self, i: int, source: str) -> None: """Read stream `i` frames in daemon thread.""" self.attempts[i] = 0 @@ -72,18 +82,10 @@ def update(self, i: int, source: str) -> None: self.attempts[i] = 0 n, f = 0, self.frames[i] # frame number, frame array cap = self.caps[i] - while self.running and n < (f - 1): # and cap.isOpened() - success = not cap.fill_buffer() # .stream() = .fill_buffer() followed by .pop_chunks() - im = self.imgs[i][-1] # Default to last valid image - if not success: - logger.warning(f'WARNING ⚠️ Video stream {i} unresponsive, please check your IP camera connection.') - self.check_attempts(i, skip_first=False) - self.attempts[i] += 1 - # TODO reopen video stream for torio loader - # cap.open(source) # re-open stream if signal was lost - else: - (im, ) = cap.pop_chunks() + for (im, ) in cap.stream(): + if not self.running or n < (f - 1): + break self.imgs[i].append(yuv_to_rgb(im)) n += 1 - else: - logger.info(f'End of stream {i}.') + + logger.info(f'End of stream {i}.') diff --git a/stream_tools/model/classifier.py b/stream_tools/model/classifier.py index 3f7ed6f..d5e526c 100644 --- a/stream_tools/model/classifier.py +++ b/stream_tools/model/classifier.py @@ -48,7 +48,7 @@ def __call__(self, imgs: list) -> Any: def inference(self, imgs: Any): start_time_ns = perf_counter_ns() correct_frame_idx = [] - if type(imgs) != list: + if not isinstance(imgs, list): imgs = [imgs] single = True else: diff --git a/stream_tools/pipeline/mutlitrack.py b/stream_tools/pipeline/mutlitrack.py index 60a5402..68a4954 100644 --- a/stream_tools/pipeline/mutlitrack.py +++ b/stream_tools/pipeline/mutlitrack.py @@ -45,19 +45,19 @@ def __init__(self, self.cams_cfg = cams_cfg self.inf_cfg = inf_cfg # Streams - logger.info(f'Initializing stream loader...') + logger.info('Initializing stream loader...') self.dataloader = dataloader self.dataloader.initialize() - logger.info(f'Stream loader initialized') + logger.info('Stream loader initialized') # Models self.detector = detector self.names = self.detector.names self.detector.initialize() - logger.info(f'Detector initialized') + logger.info('Detector initialized') # Trackers self.trackers = {cam_id: bx.create_tracker(**tracker_cfg) for cam_id in self.cams_cfg.cam_ids} self.poses = {cam_id: dict() for cam_id in self.cams_cfg.cam_ids} - logger.info(f'Trackers initialized') + logger.info('Trackers initialized') # Debug self.debug = debug if self.debug: @@ -71,7 +71,7 @@ def __init__(self, (save_img_path / 'crops' / str(cam_id)).mkdir(exist_ok=True, parents=True) self.save_img_path = save_img_path else: - logger.info(f'Debug mode: OFF') + logger.info('Debug mode: OFF') super(MultiTrackWorker, self).__init__(send, debug) @@ -108,7 +108,6 @@ def log_debug(self, timestamp, results, imgs): # TODO rewrite timestamp_str = timestamp.isoformat('T', 'milliseconds').replace(':', '_').replace('.', '_') tracks = results['tracks'] - dets = results['dets'] for i, (tracks, cam_idx) in enumerate(tracks): img = imgs[cam_idx] img_h, img_w, _ = img.shape diff --git a/stream_tools/pipeline/pipeline_base.py b/stream_tools/pipeline/pipeline_base.py index cb2e860..ae1c644 100644 --- a/stream_tools/pipeline/pipeline_base.py +++ b/stream_tools/pipeline/pipeline_base.py @@ -51,7 +51,7 @@ def worker(self) -> None: finally: self.queue.task_done() except Empty as e: - pass + print(e) return def __del__(self): diff --git a/stream_tools/utils/__init__.py b/stream_tools/utils/__init__.py index f520c24..535f6b8 100644 --- a/stream_tools/utils/__init__.py +++ b/stream_tools/utils/__init__.py @@ -1,5 +1,4 @@ from .color import yuv_to_rgb __all__ = [ - "yuv_to_rgb", -] \ No newline at end of file + 'yuv_to_rgb', ] diff --git a/stream_tools/utils/color.py b/stream_tools/utils/color.py index 5d60d4f..ae32de9 100644 --- a/stream_tools/utils/color.py +++ b/stream_tools/utils/color.py @@ -2,7 +2,7 @@ from torch import Tensor -@torch.compile(dynamic=True, backend="eager") +@torch.compile(dynamic=True, backend='eager') def yuv_to_rgb(frames: Tensor) -> Tensor: """Converts YUV BCHW dims torch tensor to RGB BCHW dims torch tensor From d61fba0dd623f3c36947fc48ca6b3a17c404aaa5 Mon Sep 17 00:00:00 2001 From: ilyabasharov Date: Tue, 7 May 2024 13:59:02 +0300 Subject: [PATCH 5/7] [Fix] add opencv cuda decode --- stream_tools/dataloader/base.py | 15 +++------------ stream_tools/dataloader/opencv_loader.py | 17 ++++++++++++++--- stream_tools/dataloader/torio_loader.py | 11 ++++------- stream_tools/utils/__init__.py | 4 +++- stream_tools/utils/ffmpeg.py | 19 +++++++++++++++++++ 5 files changed, 43 insertions(+), 23 deletions(-) create mode 100644 stream_tools/utils/ffmpeg.py diff --git a/stream_tools/dataloader/base.py b/stream_tools/dataloader/base.py index 12d88c0..90253b4 100644 --- a/stream_tools/dataloader/base.py +++ b/stream_tools/dataloader/base.py @@ -1,6 +1,6 @@ import logging import time -from threading import Event, Thread +from threading import Thread from typing import Union logger = logging.getLogger(__name__) @@ -43,7 +43,6 @@ def __init__(self, self.fps = [float('inf')] * self.n # fps of each stream self.frames = [0] * self.n # number of frames in each stream self.threads = [None] * self.n # buffer stored streams - self.events = [None] * self.n # buffer stored events for threads self.shape = [[] for _ in range(self.n)] # shape of image frames self.caps = [None] * self.n # video capture objects self.started = [True] * self.n # stream started successfully or not @@ -61,16 +60,13 @@ def check_attempts(self, i: int, skip_first: bool = False) -> None: def initialize(self): """Create a thread for each source and start it.""" for i, s in enumerate(self.sources): # index, source - self.events[i] = Event() # Start thread to read frames from video stream self.threads[i] = Thread( - target=self.__update, + target=self.update, args=([i, s]), daemon=True, ) self.threads[i].start() - # Wait for the initialization event to be set - self.events[i].wait() self.new_fps = (min(self.fps) if isinstance(self.vid_fps, str) and self.vid_fps == 'auto' else self.vid_fps ) # fps alignment @@ -91,7 +87,7 @@ def add_source(self, source: str) -> int: self.caps.append(None) self.started.append(False) self.threads.append(Thread( - target=self.__update, + target=self.update, args=([i, source]), daemon=True, )) @@ -108,11 +104,6 @@ def close_source(self, source: int) -> None: if self.threads[source].is_alive(): self.threads[source].join(timeout=5) # Add timeout - def __update(self, i: int, source: str) -> None: - """System function that calls `update` for each thread.""" - self.update(i, source) - self.events[i].set() # Signal that this thread has finished initializing - def update(self, i: int, source: str) -> None: """Read stream `i` frames in daemon thread.""" diff --git a/stream_tools/dataloader/opencv_loader.py b/stream_tools/dataloader/opencv_loader.py index 9888540..cc3ace8 100644 --- a/stream_tools/dataloader/opencv_loader.py +++ b/stream_tools/dataloader/opencv_loader.py @@ -1,4 +1,5 @@ import logging +import os from collections import deque import cv2 @@ -6,16 +7,26 @@ logger = logging.getLogger(__name__) from stream_tools.dataloader import BaseStreamLoader +from stream_tools.utils import make_ffmpeg_decoder class OpenCVLoader(BaseStreamLoader): - def init_stream(self, stream: str, i: int, device: str = 'cpu') -> bool: + def init_stream( + self, + stream: str, + i: int, + device: str = 'cpu', + decoder: str = 'h264', + ) -> bool: """Init stream and fill the main info about it.""" - assert 'cpu' in device, f'Only cpu device now supported, got {device}.' success, im = False, None + gpu_flag = 'cuda' in device + ffmpeg_option = f'video_codec;{make_ffmpeg_decoder(decoder, gpu_flag)}' + os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = ffmpeg_option try: - cap = cv2.VideoCapture(stream) + cap = cv2.VideoCapture(stream, cv2.CAP_FFMPEG) + cap.set(cv2.CAP_PROP_CONVERT_RGB, 1.) success, im = cap.read() # guarantee first frame except Exception as ex: logger.warning(f'Video stream {i} is unresponsive on start: {ex}, reconnecting...') diff --git a/stream_tools/dataloader/torio_loader.py b/stream_tools/dataloader/torio_loader.py index 601d2e0..78a0da9 100644 --- a/stream_tools/dataloader/torio_loader.py +++ b/stream_tools/dataloader/torio_loader.py @@ -3,12 +3,11 @@ import torch from torchaudio.io import StreamReader -from torchaudio.utils import ffmpeg_utils logger = logging.getLogger(__name__) from stream_tools.dataloader import BaseStreamLoader -from stream_tools.utils import yuv_to_rgb +from stream_tools.utils import make_ffmpeg_decoder, yuv_to_rgb class TorioLoader(BaseStreamLoader): @@ -22,19 +21,17 @@ def init_stream( ) -> bool: """Init stream and fill the main info about it.""" success, im = False, None + gpu_flag = 'cuda' in device + decoder = make_ffmpeg_decoder(decoder, gpu_flag) decode_config = { 'frames_per_chunk': 1, 'buffer_chunk_size': 1, 'decoder': decoder, 'decoder_option': { 'threads': '0', }, } - if 'cuda' in device: - decode_config['decoder'] = f'{decoder}_cuvid' + if gpu_flag: decode_config['hw_accel'] = device decode_config['decoder_option']['gpu'] = '0' - - assert decode_config['decoder'] in ffmpeg_utils.get_video_decoders().keys(), \ - f'Decoder {decoder} is not supported. Please check available decoder.' try: cap = StreamReader(stream) cap.add_video_stream(**decode_config) diff --git a/stream_tools/utils/__init__.py b/stream_tools/utils/__init__.py index 535f6b8..dab88ea 100644 --- a/stream_tools/utils/__init__.py +++ b/stream_tools/utils/__init__.py @@ -1,4 +1,6 @@ from .color import yuv_to_rgb +from .ffmpeg import make_ffmpeg_decoder __all__ = [ - 'yuv_to_rgb', ] + 'yuv_to_rgb', + 'make_ffmpeg_decoder', ] diff --git a/stream_tools/utils/ffmpeg.py b/stream_tools/utils/ffmpeg.py new file mode 100644 index 0000000..b85061c --- /dev/null +++ b/stream_tools/utils/ffmpeg.py @@ -0,0 +1,19 @@ +from torchaudio.utils import ffmpeg_utils + +def make_ffmpeg_decoder(decoder: str, gpu: bool) -> str: + '''Check and make ffmpeg decoder + Inputs: + decoder: str + device: bool if true than gpu, false - cpu + + Returns: + str + ''' + + if gpu: + decoder = f'{decoder}_cuvid' + + assert decoder in ffmpeg_utils.get_video_decoders(), \ + f'Decoder {decoder} is not supported in FFmpeg. Please check available decoder.' + + return decoder From 129f98c4d085f7375665f7d6df2be465c3ef3ae9 Mon Sep 17 00:00:00 2001 From: ilyabasharov Date: Tue, 7 May 2024 13:59:15 +0300 Subject: [PATCH 6/7] fix --- stream_tools/utils/ffmpeg.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/stream_tools/utils/ffmpeg.py b/stream_tools/utils/ffmpeg.py index b85061c..f53f7c6 100644 --- a/stream_tools/utils/ffmpeg.py +++ b/stream_tools/utils/ffmpeg.py @@ -1,11 +1,12 @@ from torchaudio.utils import ffmpeg_utils + def make_ffmpeg_decoder(decoder: str, gpu: bool) -> str: '''Check and make ffmpeg decoder Inputs: decoder: str - device: bool if true than gpu, false - cpu - + device: bool if true than gpu, false - cpu + Returns: str ''' @@ -15,5 +16,5 @@ def make_ffmpeg_decoder(decoder: str, gpu: bool) -> str: assert decoder in ffmpeg_utils.get_video_decoders(), \ f'Decoder {decoder} is not supported in FFmpeg. Please check available decoder.' - + return decoder From e1606e7a74338ef6db5fc24267f212c2721f5358 Mon Sep 17 00:00:00 2001 From: ilyabasharov Date: Tue, 7 May 2024 14:35:32 +0300 Subject: [PATCH 7/7] [Enhance] add gpu-optimized letterbox --- stream_tools/utils/transform.py | 57 +++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 stream_tools/utils/transform.py diff --git a/stream_tools/utils/transform.py b/stream_tools/utils/transform.py new file mode 100644 index 0000000..a41340a --- /dev/null +++ b/stream_tools/utils/transform.py @@ -0,0 +1,57 @@ +from typing import Tuple, List + +import torch +from torch import Tensor +import torch.nn.functional as F +from torchvision.transforms.functional import resize +from torchvision.transforms import InterpolationMode + + +@torch.compile(dynamic=True, backend='eager') +def letterbox( + img: Tensor, + new_shape: Tuple[int, int] = (640, 640), + auto: bool = False, + scale_fill: bool = False, + scaleup: bool = False, + stride: int = 32, +) -> Tuple[Tensor, Tuple[float, float], Tuple[float,float], List[int]]: + '''Letterbox from https://github.com/ultralytics/ultralytics/blob/537c50e45f94b214338c6e53bc9822b15fe3a595/ultralytics/data/augment.py#L740 + Inputs: + img: float BxCxHxW + ''' + + shape = img.shape[-2:] # current shape [height, width] + if isinstance(new_shape, int): + new_shape = (new_shape, new_shape) + + # Scale ratio (new / old) + r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) + if not scaleup: # only scale down, do not scale up (for better test mAP) + r = min(r, 1.0) + + # Compute padding + ratio = r, r # width, height ratios + new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)), + dw, dh = float(new_shape[1] - new_unpad[0]), float(new_shape[0] - new_unpad[1]) # wh padding + if auto: # minimum rectangle + # dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding + dw, dh = torch.remainder(dw, stride), torch.remainder(dh, stride) + elif scale_fill: # stretch + dw, dh = 0.0, 0.0 + new_unpad = (new_shape[1], new_shape[0]) + ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios + + dw /= 2 # divide padding into 2 sides + dh /= 2 + + if shape[::-1] != new_unpad: # resize + # img = F.interpolate(img, size=new_unpad[::-1]) + img = resize(img, new_unpad[::-1], interpolation=InterpolationMode.BILINEAR, antialias=True) + # img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) + top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) + left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) + # img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border + pad = (left,right,top,bottom) + img = F.pad(img, pad, value=114.0) + return img, ratio, (dw, dh), shape