diff --git a/.github/workflows/auto-pr-autodelete.yml b/.github/workflows/auto-pr-autodelete.yml new file mode 100644 index 000000000..21bdfc58a --- /dev/null +++ b/.github/workflows/auto-pr-autodelete.yml @@ -0,0 +1,27 @@ +name: Auto PR Branch Cleanup + +on: + pull_request: + types: + - closed + +jobs: + clean-auto-pr: + if: ${{ github.event.pull_request.merged }} && + ${{ github.event.pull_request.head.repo.id == github.event.pull_request.base.repo.id }} && + ${{ github.event.pull_request.head.ref == 'auto-pr-*' }} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + token: ${{ secrets.AUTOPR_SECRET }} + + - name: set git config + run: | + git config --global user.email "${GITHUB_ACTOR_ID}+${GITHUB_ACTOR}@users.noreply.github.com" + git config --global user.name "${GITHUB_ACTOR}" + git config --global advice.mergeConflict false + git config --global --add safe.directory "${{ github.workspace }}" + git config -l + git push -d origin ${{ github.event.pull_request.head.ref }} \ No newline at end of file diff --git a/.github/workflows/auto-pr-new.yml b/.github/workflows/auto-pr-precise.yml similarity index 66% rename from .github/workflows/auto-pr-new.yml rename to .github/workflows/auto-pr-precise.yml index c47682234..b06e5a41a 100644 --- a/.github/workflows/auto-pr-new.yml +++ b/.github/workflows/auto-pr-precise.yml @@ -12,6 +12,7 @@ jobs: - uses: actions/checkout@v4 with: fetch-depth: 0 + token: ${{ secrets.AUTOPR_SECRET }} - name: set git config run: | @@ -56,44 +57,36 @@ jobs: set -x git checkout ${{steps.branch_info.outputs.NEXT_BRANCH}} git checkout -b ${{steps.create_branch.outputs.PRBRANCH}} - echo "MESSAGE<<__EOF" >> $GITHUB_OUTPUT - git log --format=%B ${{ github.event.commits[0].id }}~..${{ github.event.after }} >> $GITHUB_OUTPUT - echo "__EOF" >> $GITHUB_OUTPUT + echo "TITLE<<__AUTOPR_EOF" >> $GITHUB_OUTPUT + git log --format="| %s" ${{ github.event.commits[0].id }}~..${{ github.event.after }} | tr '\n' ' ' >> $GITHUB_OUTPUT + echo "" >> $GITHUB_OUTPUT + echo "__AUTOPR_EOF" >> $GITHUB_OUTPUT + echo "MESSAGE<<__AUTOPR_EOF" >> $GITHUB_OUTPUT + git log --format="> %B" ${{ github.event.commits[0].id }}~..${{ github.event.after }} >> $GITHUB_OUTPUT + echo "__AUTOPR_EOF" >> $GITHUB_OUTPUT + cat $GITHUB_OUTPUT REVS=$(git rev-list --reverse ${{ github.event.commits[0].id }}~..${{ github.event.after }} ) for rev in "$REVS"; do - if ! git cherry-pick -X theirs ${rev} ; then - echo "merge=false" >> $GITHUB_OUTPUT - exit 0; + if ! git cherry-pick ${rev} ; then + git add -u + git -c core.editor=true cherry-pick --continue fi done git push -d origin ${{steps.create_branch.outputs.PRBRANCH}} || true git push -u origin ${{steps.create_branch.outputs.PRBRANCH}} - echo "merge=true" >> $GITHUB_OUTPUT - uses: actions/github-script@v7 name: Open pick PR to ${{steps.branch_info.outputs.NEXT_BRANCH}} - if: steps.merge-changes.outputs.merge == 'true' + env: + TITLE: ${{steps.merge-changes.outputs.TITLE}} + MESSAGE: ${{steps.merge-changes.outputs.MESSAGE}} with: - github-token: ${{ github.token }} + github-token: ${{ secrets.AUTOPR_SECRET }} script: | await github.rest.pulls.create({ ...context.repo, - title: `Pick ${{steps.branch_info.outputs.CURRENT_VERSION}} to ${{steps.branch_info.outputs.NEXT_VERSION}}`, + title: `[Pick][${{steps.branch_info.outputs.CURRENT_VERSION}} to ${{steps.branch_info.outputs.NEXT_VERSION}}] ` + process.env.TITLE, head: `${{steps.create_branch.outputs.PRBRANCH}}`, base: `${{steps.branch_info.outputs.NEXT_BRANCH}}`, - body: `${{steps.merge-changes.outputs.MESSAGE}}\nGenerated by Auto PR, by cherry-pick related commits`, - }); - - - uses: actions/github-script@v7 - name: Open merge PR to ${{steps.branch_info.outputs.NEXT_BRANCH}} - if: steps.merge-changes.outputs.merge == 'false' - with: - github-token: ${{ github.token }} - script: | - await github.rest.pulls.create({ - ...context.repo, - title: `Merge ${{steps.branch_info.outputs.CURRENT_VERSION}} to ${{steps.branch_info.outputs.NEXT_VERSION}}`, - head: `${context.ref}`, - base: `${{steps.branch_info.outputs.NEXT_BRANCH}}`, - body: `${{steps.merge-changes.outputs.MESSAGE}}\nGenerated by Auto PR, using merge since cherry-pick failed`, + body: process.env.MESSAGE + `\nGenerated by Auto PR, by cherry-pick related commits`, }); diff --git a/.github/workflows/auto-pr.yml b/.github/workflows/auto-pr.yml.obsolete similarity index 100% rename from .github/workflows/auto-pr.yml rename to .github/workflows/auto-pr.yml.obsolete diff --git a/.github/workflows/ci.linux.arm.yml b/.github/workflows/ci.linux.arm.yml index c9b2c6b1f..d084a9c68 100644 --- a/.github/workflows/ci.linux.arm.yml +++ b/.github/workflows/ci.linux.arm.yml @@ -1,14 +1,12 @@ name: Linux ARM on: - push: - branches: [ "main", "release/*" ] pull_request: branches: [ "main", "release/*" ] jobs: - gcc921: - runs-on: [self-hosted, Linux, ARM64] + arm-linux-build-and-test: + runs-on: ubuntu-24.04-arm container: image: ghcr.io/alibaba/photon-ut-base:latest @@ -23,7 +21,7 @@ jobs: - uses: actions/checkout@v4 - - name: Build + - name: Build-Debug-921-C++14 run: | source /opt/rh/gcc-toolset-9/enable cmake -B build \ @@ -36,31 +34,42 @@ jobs: -D PHOTON_ENABLE_EXTFS=ON cmake --build build -j $(nproc) -- VERBOSE=1 - - name: Test + - name: Build-Debug-921-C++17 run: | - cd build - ctest -E test-lockfree --timeout 3600 -V - - gcc921-build-debug: - runs-on: [self-hosted, Linux, ARM64] - - container: - image: ghcr.io/alibaba/photon-ut-base:latest - options: --cpus 4 - - steps: - - uses: szenius/set-timezone@v2.0 - with: - timezoneLinux: "Asia/Shanghai" - timezoneMacos: "Asia/Shanghai" - timezoneWindows: "China Standard Time" + source /opt/rh/gcc-toolset-9/enable + rm -fr build + cmake -B build \ + -D PHOTON_CXX_STANDARD=17 \ + -D CMAKE_BUILD_TYPE=Debug \ + -D PHOTON_ENABLE_ECOSYSTEM=ON \ + -D PHOTON_BUILD_TESTING=ON \ + -D PHOTON_ENABLE_SASL=ON \ + -D PHOTON_ENABLE_FUSE=ON \ + -D PHOTON_ENABLE_LIBCURL=ON \ + -D PHOTON_ENABLE_EXTFS=ON + cmake --build build -j $(nproc) -- VERBOSE=1 - - uses: actions/checkout@v4 + - name: Build-Debug-1121-C++20 + run: | + source /opt/rh/gcc-toolset-11/enable + rm -fr build + cmake -B build \ + -D PHOTON_CXX_STANDARD=20 \ + -D CMAKE_BUILD_TYPE=Debug \ + -D PHOTON_ENABLE_ECOSYSTEM=ON \ + -D PHOTON_BUILD_TESTING=ON \ + -D PHOTON_ENABLE_SASL=ON \ + -D PHOTON_ENABLE_FUSE=ON \ + -D PHOTON_ENABLE_LIBCURL=ON \ + -D PHOTON_ENABLE_EXTFS=ON + cmake --build build -j $(nproc) -- VERBOSE=1 - - name: Build + - name: Build-Debug-1211-C++23 run: | - source /opt/rh/gcc-toolset-9/enable + source /opt/rh/gcc-toolset-12/enable + rm -fr build cmake -B build \ + -D PHOTON_CXX_STANDARD=23 \ -D CMAKE_BUILD_TYPE=Debug \ -D PHOTON_ENABLE_ECOSYSTEM=ON \ -D PHOTON_BUILD_TESTING=ON \ @@ -69,3 +78,8 @@ jobs: -D PHOTON_ENABLE_LIBCURL=ON \ -D PHOTON_ENABLE_EXTFS=ON cmake --build build -j $(nproc) -- VERBOSE=1 + + - name: Test + run: | + cd build + ctest -E test-lockfree --timeout 3600 -V diff --git a/.github/workflows/ci.linux.x86_64.yml b/.github/workflows/ci.linux.x86_64.yml index c8119a68e..f63773fd0 100644 --- a/.github/workflows/ci.linux.x86_64.yml +++ b/.github/workflows/ci.linux.x86_64.yml @@ -1,8 +1,6 @@ name: Linux x86_64 on: - push: - branches: [ "main", "release/*" ] pull_request: branches: [ "main", "release/*" ] diff --git a/.github/workflows/ci.macos.arm.yml b/.github/workflows/ci.macos.arm.yml index f999b6675..dc0883e06 100644 --- a/.github/workflows/ci.macos.arm.yml +++ b/.github/workflows/ci.macos.arm.yml @@ -1,14 +1,12 @@ name: macOS ARM on: - push: - branches: [ "main", "release/*" ] pull_request: branches: [ "main", "release/*" ] jobs: macOS14-arm: - runs-on: macos-14 + runs-on: macos-15 steps: - uses: szenius/set-timezone@v2.0 @@ -22,17 +20,18 @@ jobs: - name: Install Dependencies shell: bash run: | - brew install cmake openssl gflags googletest gsasl + brew install openssl gflags googletest gsasl - name: Build run: | cmake -B ${{github.workspace}}/build \ + -D PHOTON_CXX_STANDARD=17 \ -D PHOTON_ENABLE_ECOSYSTEM=ON \ -D PHOTON_BUILD_TESTING=ON \ -D CMAKE_BUILD_TYPE=MinSizeRel \ -D PHOTON_ENABLE_SASL=ON \ -D PHOTON_ENABLE_LIBCURL=ON \ - -D OPENSSL_ROOT_DIR=/opt/homebrew/Cellar/openssl@1.1/1.1.1w + -D OPENSSL_ROOT_DIR=/opt/homebrew/Cellar/openssl@3/3.6.0 cmake --build ${{github.workspace}}/build -j $(sysctl -n hw.logicalcpu) - name: Test diff --git a/.github/workflows/ci.macos.x86_64.yml b/.github/workflows/ci.macos.x86_64.yml index 795f304ab..c244bd424 100644 --- a/.github/workflows/ci.macos.x86_64.yml +++ b/.github/workflows/ci.macos.x86_64.yml @@ -1,14 +1,12 @@ name: macOS x86_64 on: - push: - branches: [ "main", "release/*" ] pull_request: branches: [ "main", "release/*" ] jobs: macOS13-x86: - runs-on: macos-13 + runs-on: macos-15 steps: - uses: szenius/set-timezone@v2.0 @@ -22,17 +20,18 @@ jobs: - name: Install Dependencies shell: bash run: | - brew install cmake openssl gflags googletest gsasl + brew install openssl gflags googletest gsasl - name: Build run: | cmake -B ${{github.workspace}}/build \ + -D PHOTON_CXX_STANDARD=17 \ -D PHOTON_ENABLE_ECOSYSTEM=ON \ -D PHOTON_BUILD_TESTING=ON \ -D CMAKE_BUILD_TYPE=MinSizeRel \ -D PHOTON_ENABLE_SASL=ON \ -D PHOTON_ENABLE_LIBCURL=ON \ - -D OPENSSL_ROOT_DIR=/usr/local/opt/openssl@3 + -D OPENSSL_ROOT_DIR=/opt/homebrew/Cellar/openssl@3/3.6.0 cmake --build ${{github.workspace}}/build -j $(sysctl -n hw.logicalcpu) - name: Test diff --git a/CMakeLists.txt b/CMakeLists.txt index 54d08d06b..20659174d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,6 +6,9 @@ project( LANGUAGES C CXX ASM ) +# for CMake 4.x compatible +set(CMAKE_POLICY_VERSION_MINIMUM 3.5) + # Utility Modules and Find Modules include(FindPackageHandleStandardArgs) include(CheckCXXCompilerFlag) @@ -28,6 +31,7 @@ option(PHOTON_ENABLE_FSTACK_DPDK "Use f-stack + DPDK as the event engine" OFF) option(PHOTON_ENABLE_EXTFS "enable extfs" OFF) option(PHOTON_ENABLE_ECOSYSTEM "enable ecosystem" OFF) option(PHOTON_ENABLE_LIBCURL "enable libcurl" ON) +set(PHOTON_DEFAULT_LOG_LEVEL "0" CACHE STRING "default log level") option(PHOTON_BUILD_DEPENDENCIES "" OFF) set(PHOTON_AIO_SOURCE "https://pagure.io/libaio/archive/libaio-0.3.113/libaio-0.3.113.tar.gz" CACHE STRING "") @@ -42,7 +46,7 @@ set(PHOTON_E2FS_SOURCE "" CACHE STRING "") set(PHOTON_GFLAGS_SOURCE "https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz" CACHE STRING "") set(PHOTON_GOOGLETEST_SOURCE "https://github.com/google/googletest/archive/refs/tags/release-1.12.1.tar.gz" CACHE STRING "") set(PHOTON_RAPIDJSON_GIT "https://github.com/Tencent/rapidjson.git" CACHE STRING "") -set(PHOTON_RAPIDXML_SOURCE "https://sourceforge.net/projects/rapidxml/files/rapidxml/rapidxml%201.13/rapidxml-1.13.zip/download" CACHE STRING "") +set(PHOTON_RAPIDXML_SOURCE "https://github.com/photonlibos/rapidxml" CACHE STRING "") set(PHOTON_RAPIDYAML_SOURCE "https://github.com/biojppm/rapidyaml/releases/download/v0.5.0/rapidyaml-0.5.0.hpp" CACHE STRING "") set(PHOTON_CPP_REDIS_SOURCE "https://github.com/cpp-redis/cpp_redis/archive/refs/tags/4.3.1.tar.gz" CACHE STRING "") @@ -64,6 +68,7 @@ if (PHOTON_BUILD_WITH_ASAN) if ((NOT CMAKE_BUILD_TYPE STREQUAL "Debug") OR (NOT CMAKE_SYSTEM_NAME STREQUAL "Linux")) message(FATAL_ERROR "Wrong environment") endif () + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -static-libasan") add_link_options(-fsanitize=address -static-libasan) endif () @@ -257,6 +262,9 @@ endif() if (PHOTON_ENABLE_LIBCURL) target_compile_definitions(photon_obj PRIVATE ENABLE_CURL) endif() +if (PHOTON_DEFAULT_LOG_LEVEL) + target_compile_definitions(photon_obj PRIVATE DEFAULT_LOG_LEVEL=${PHOTON_DEFAULT_LOG_LEVEL}) +endif() if (actually_built) add_dependencies(photon_obj ${actually_built}) diff --git a/common/alog.cpp b/common/alog.cpp index 759722ee2..43294f957 100644 --- a/common/alog.cpp +++ b/common/alog.cpp @@ -77,7 +77,10 @@ class LogOutputNull : public BaseLogOutput { static LogOutputNull _log_output_null; ILogOutput* const log_output_null = &_log_output_null; -ALogLogger default_logger {log_output_stdout, ALOG_DEBUG}; +#ifndef DEFAULT_LOG_LEVEL +#define DEFAULT_LOG_LEVEL ALOG_DEBUG +#endif +ALogLogger default_logger {log_output_stdout, DEFAULT_LOG_LEVEL}; ALogLogger default_audit_logger {log_output_null, ALOG_AUDIT}; uint32_t& log_output_level = default_logger.log_level; @@ -204,23 +207,26 @@ void LogFormatter::put_integer_dec(ALogBuffer& buf, ALogInteger x) } __attribute__((constructor)) static void __initial_timezone() { tzset(); } -static time_t dayid = 0; +static time_t dayid = 0, minuteid = 0, tsdelta = 0; static struct tm alog_time = {0}; -static struct tm* alog_update_time(time_t now) -{ - auto now0 = now; +static struct tm* alog_update_time(time_t now0) { + auto now = now0 + tsdelta; int sec = now % 60; now /= 60; + if (unlikely(now != minuteid)) { // calibrate wall time every minute + now = time(0) - timezone; + tsdelta = now - now0; + sec = now % 60; now /= 60; + minuteid = now; + } int min = now % 60; now /= 60; int hor = now % 24; now /= 24; - if (now != dayid) - { + if (now != dayid) { dayid = now; - gmtime_r(&now0, &alog_time); + auto now_ = now0 + tsdelta; + gmtime_r(&now_, &alog_time); alog_time.tm_year+=1900; alog_time.tm_mon++; - } - else - { + } else { alog_time.tm_sec = sec; alog_time.tm_min = min; alog_time.tm_hour = hor; @@ -484,7 +490,7 @@ LogBuffer& operator << (LogBuffer& log, const Prologue& pro) auto t = &alog_time; #else auto ts = photon::__update_now(); - auto t = alog_update_time(ts.sec() - timezone); + auto t = alog_update_time(ts.sec()); #endif #define DEC_W2P0(x) DEC(x).width(2).padding('0') log.printf(t->tm_year, '/'); diff --git a/common/alog.h b/common/alog.h index 4ed36ec1f..1a79f467d 100644 --- a/common/alog.h +++ b/common/alog.h @@ -220,7 +220,7 @@ class LogFormatter buf.consume(s.size); } - void put(ALogBuffer& buf, void* p) + void put(ALogBuffer& buf, const void* p) { put(buf, HEX((uint64_t)p).width(16)); } @@ -554,22 +554,31 @@ inline LogBuffer& operator<<(LogBuffer& log, const NamedValue& v) { return log.printf('[', v.name, '=', v.value, ']'); } +inline void _alog_set_errno(int x) { errno = x; } + +struct DeferrdErrnoSetter { + int _errno = 0; + void operator()(int x) { _errno = x; } + ~DeferrdErrnoSetter() { if (_errno) errno = _errno; } +}; + +#define DECLARE_ERRNO_SETTER DeferrdErrnoSetter _alog_set_errno; + // output a log message, set errno, then return a value // keep errno unchaged if new_errno == 0 #define LOG_ERROR_RETURN(new_errno, retv, ...) { \ int xcode = (int)(new_errno); \ if (xcode == 0) xcode = errno; \ LOG_ERROR(__VA_ARGS__); \ - errno = xcode; \ + _alog_set_errno(xcode); \ return retv; \ } // output a log message with errno info, set errno, then return a value // keep errno unchaged if new_errno == 0 #define LOG_ERRNO_RETURN(new_errno, retv, ...) { \ - ERRNO eno; \ - LOG_ERROR(__VA_ARGS__, ' ', eno); \ - if (new_errno) eno.set(new_errno); \ + LOG_ERROR(__VA_ARGS__, ' ', ERRNO()); \ + if (new_errno) _alog_set_errno(new_errno); \ return retv; \ } diff --git a/common/consistent-hash-map.h b/common/consistent-hash-map.h index b108a4e32..3e9531416 100644 --- a/common/consistent-hash-map.h +++ b/common/consistent-hash-map.h @@ -55,10 +55,10 @@ class consistent_hash_map const allocator_type& alloc = allocator_type()) : m_comp(comp), m_vector(alloc) { } - using reference = typename allocator_type::reference; - using const_reference = typename allocator_type::const_reference; - using pointer = typename allocator_type::pointer; - using const_pointer = typename allocator_type::const_pointer; + using reference = mapped_type&; + using const_reference = const mapped_type&; + using pointer = mapped_type*; + using const_pointer = const mapped_type*; using container_type = std::vector; using iterator = typename container_type::iterator; using const_iterator = typename container_type::const_iterator; diff --git a/common/enumerable.h b/common/enumerable.h index ab86c4e72..312f2d9b9 100644 --- a/common/enumerable.h +++ b/common/enumerable.h @@ -41,7 +41,12 @@ struct Enumerable if (obj && !obj->valid()) this->obj = nullptr; } +#if __cplusplus < 201703L using R = typename std::result_of::type; +#else + using R = typename std::invoke_result::type; +#endif + R operator*() { return obj ? obj->get() : R{}; } bool operator==(const iterator& rhs) const { return obj == rhs.obj; } bool operator!=(const iterator& rhs) const { return !(*this == rhs); } diff --git a/common/estring.h b/common/estring.h index 4a562fe9f..14e04bef0 100644 --- a/common/estring.h +++ b/common/estring.h @@ -22,6 +22,7 @@ limitations under the License. #include #include #include +#include #include #include @@ -45,14 +46,14 @@ struct charset : std::bitset<256> // charset(const char (&s) [N]) : // charset(std::string_view(s, N - 1)) { } - // bool test(char ch) const - // { - // return std::bitset<256>::test((unsigned char)ch); - // } - // bitset& set(char ch, bool value = true) - // { - // return std::bitset<256>::set((unsigned char)ch, value); - // } + bool test(char ch) const + { + return std::bitset<256>::test((unsigned char)ch); + } + bitset& set(char ch, bool value = true) + { + return std::bitset<256>::set((unsigned char)ch, value); + } }; class estring_view : public std::string_view @@ -75,6 +76,42 @@ class estring_view : public std::string_view size_t find_last_of(const charset& set) const; size_t find_last_not_of(const charset& set) const; + template + class Extraction { + char _buf[N]; + std::unique_ptr _s; + bool _ownership; + public: + Extraction(std::string_view sv, bool strict = false) { + //for regular strings, sv[sv.size()] should be accessible + if (!strict && sv[sv.size()] == '\0') { + _s.reset((char*)sv.data()); + _ownership = false; + } else if (sv.size() < N) { + memcpy(_buf, sv.data(), sv.size()); + _buf[sv.size()] = '\0'; + _s.reset(_buf); + _ownership = false; + } else { + auto ptr = (char*)malloc(sv.size() + 1); + memcpy(ptr, sv.data(), sv.size()); + ptr[sv.size()] = '\0'; + _s.reset(ptr); + _ownership = true; + } + } + operator const char* () const { + return _s.get(); + } + ~Extraction() { + if (!_ownership) + _s.release(); + } + }; + template + Extraction extract_c_str() const { + return {*this}; + } operator std::string () { return to_string(); @@ -160,10 +197,10 @@ class estring_view : public std::string_view } iterator& operator++() { - _part = _host->find_part(_part.end()); + _part = _host->find_part(_part.end() + 1); return *this; } - iterator& operator++(int) + iterator operator++(int) { auto ret = *this; ++(*this); @@ -171,7 +208,8 @@ class estring_view : public std::string_view } bool operator == (const iterator& rhs) const { - return _part == rhs._part; + return _part.data() == rhs._part.data() && + _part.length() == rhs._part.length(); } bool operator != (const iterator& rhs) const { diff --git a/common/executor/executor.h b/common/executor/executor.h index 7c6811852..49a701cf1 100644 --- a/common/executor/executor.h +++ b/common/executor/executor.h @@ -41,7 +41,11 @@ class Executor { template < typename Context = AutoContext, typename Func, +#if __cplusplus < 201703L typename R = typename std::result_of::type, +#else + typename R = typename std::invoke_result::type, +#endif typename _ = typename std::enable_if::value, R>::type> R perform(Func &&act) { R result; @@ -60,7 +64,11 @@ class Executor { template < typename Context = AutoContext, typename Func, +#if __cplusplus < 201703L typename R = typename std::result_of::type, +#else + typename R = typename std::invoke_result::type, +#endif typename _ = typename std::enable_if::value, R>::type> void perform(Func &&act) { Awaiter aop; diff --git a/common/expirecontainer.h b/common/expirecontainer.h index 11557378b..74cea3695 100644 --- a/common/expirecontainer.h +++ b/common/expirecontainer.h @@ -463,3 +463,4 @@ class ObjectCache> Base::release(typename Base::Item(key), recycle, destroy)); } }; + diff --git a/common/iovector.h b/common/iovector.h index 5298e7d89..bd104902a 100644 --- a/common/iovector.h +++ b/common/iovector.h @@ -529,12 +529,15 @@ class iovector update(va); return ptr; } - + if (va.sum() < bytes) { + return nullptr; + } + auto buf = do_malloc(bytes); - auto ret = extract_front(bytes, buf); - return ret == bytes ? - buf : - nullptr; + if (buf) { + extract_front(bytes, buf); + } + return buf; } // try to extract `bytes` bytes from the back @@ -620,12 +623,15 @@ class iovector update(va); return ptr; } + if (va.sum() < bytes) { + return nullptr; + } auto buf = do_malloc(bytes); - auto ret = extract_back(bytes, buf); - return ret == bytes ? - buf : - nullptr; + if (buf) { + extract_back(bytes, buf); + } + return buf; } // copy data to a buffer of size `size`, diff --git a/common/lockfree_queue.h b/common/lockfree_queue.h index 383f03fc6..bc39451c6 100644 --- a/common/lockfree_queue.h +++ b/common/lockfree_queue.h @@ -295,7 +295,7 @@ class LockfreeBatchMPMCRingQueue : public LockfreeRingQueueBase { size_t push_batch(const T *x, size_t n) { size_t rh, wt; - wt = tail.load(std::memory_order_relaxed); + wt = tail.load(std::memory_order_acquire); for (;;) { rh = head.load(std::memory_order_acquire); auto wn = std::min(n, Base::capacity - (wt - rh)); @@ -325,7 +325,7 @@ class LockfreeBatchMPMCRingQueue : public LockfreeRingQueueBase { size_t pop_batch(T *x, size_t n) { size_t rt, wh; - rt = read_tail.load(std::memory_order_relaxed); + rt = read_tail.load(std::memory_order_acquire); for (;;) { wh = write_head.load(std::memory_order_acquire); auto rn = std::min(n, wh - rt); diff --git a/common/objectcachev2.h b/common/objectcachev2.h index 106f8c030..1915353f2 100644 --- a/common/objectcachev2.h +++ b/common/objectcachev2.h @@ -38,7 +38,7 @@ class ObjectCacheV2 { const K key; std::shared_ptr ref; // prevent create multiple time when borrow - photon::spinlock createlock; + photon::mutex createlock{0}; // create timestamp, for cool-down of borrow uint64_t lastcreate = 0; // reclaim timestamp @@ -83,7 +83,7 @@ class ObjectCacheV2 { }; // protect object cache map - photon::spinlock maplock; + photon::mutex maplock{0}; // protect lru list std::unordered_set map; intrusive_list lru_list; diff --git a/common/range-lock.h b/common/range-lock.h index bf8f30570..ba1f21f76 100644 --- a/common/range-lock.h +++ b/common/range-lock.h @@ -18,6 +18,7 @@ limitations under the License. #include #include #include +#include class RangeLock { @@ -118,7 +119,7 @@ class RangeLock } uint64_t end() const { - return offset + length; + return photon::sat_add(offset, length); } bool operator < (const range_t& rhs) const { diff --git a/common/stream.cpp b/common/stream.cpp index 62a754216..e33cf9a9d 100644 --- a/common/stream.cpp +++ b/common/stream.cpp @@ -27,11 +27,14 @@ IStream::ReadAll IStream::readall(size_t max_buf, size_t min_buf) { buf.size = -buf.size; LOG_ERROR_RETURN(ENOBUFS, buf, "content size in stream exceeds upper limit ", max_buf); } - auto ptr = realloc(buf.ptr.get(), capacity *= 2); + capacity *= 2; + if ((size_t) capacity > max_buf) capacity = max_buf; + auto ptr = realloc(buf.ptr.get(), capacity); if (!ptr) { buf.size = -buf.size; LOG_ERROR_RETURN(ENOBUFS, buf, "failed to realloc(`)", capacity); } + buf.ptr.release(); buf.ptr.reset(ptr); } } diff --git a/common/test/test.cpp b/common/test/test.cpp index 8e11c380b..efe14ee7b 100644 --- a/common/test/test.cpp +++ b/common/test/test.cpp @@ -507,7 +507,7 @@ TEST(iovector, test1) EXPECT_EQ(iov.sum(), 0); EXPECT_TRUE(iov.empty()); EXPECT_EQ(iov.front_free_iovcnt(), IOVector::default_preserve); - EXPECT_EQ(iov.back_free_iovcnt(), IOVector::capacity - IOVector::default_preserve); + EXPECT_EQ(iov.back_free_iovcnt(), (uint16_t)IOVector::capacity - (uint16_t)IOVector::default_preserve); EXPECT_EQ(iov.begin(), iov.iovec()); iovec v{nullptr, 33}, v2{nullptr, 44}; @@ -523,7 +523,7 @@ TEST(iovector, test1) EXPECT_EQ(iov.back(), v); EXPECT_EQ(iov.iovcnt(), 3); EXPECT_EQ(iov.front_free_iovcnt(), IOVector::default_preserve - 3); - EXPECT_EQ(iov.back_free_iovcnt(), IOVector::capacity - IOVector::default_preserve); + EXPECT_EQ(iov.back_free_iovcnt(), (uint16_t)IOVector::capacity - (uint16_t)IOVector::default_preserve); EXPECT_EQ(iov.sum(), 44+55+33); iov.push_back(77); @@ -538,7 +538,7 @@ TEST(iovector, test1) EXPECT_EQ(iov.back(), v); EXPECT_EQ(iov.iovcnt(), 6); EXPECT_EQ(iov.front_free_iovcnt(), IOVector::default_preserve - 3); - EXPECT_EQ(iov.back_free_iovcnt(), IOVector::capacity - IOVector::default_preserve - 3); + EXPECT_EQ(iov.back_free_iovcnt(), (uint16_t)IOVector::capacity - (uint16_t)IOVector::default_preserve - 3); EXPECT_EQ(iov.sum(), 44+55+33 + 77+44+33); EXPECT_EQ(iov.pop_front(), 44); @@ -863,6 +863,51 @@ TEST(estring, test) EXPECT_EQ(a[1], "q3r1234"); EXPECT_EQ(a[2], "poiu"); + sp = s.split(cs, false); + it = sp.begin(); + front = *it; + remainder = it.remainder(); + LOG_DEBUG(VALUE(front), VALUE(remainder)); + EXPECT_EQ(front, "alskdjf"); + EXPECT_EQ(remainder, ";;,q3r1234;poiu"); + it ++; + front = *it; + remainder = it.remainder(); + LOG_DEBUG(VALUE(front), VALUE(remainder)); + EXPECT_EQ(front, ""); + EXPECT_EQ(remainder, ";,q3r1234;poiu"); + it ++; + front = *it; + remainder = it.remainder(); + LOG_DEBUG(VALUE(front), VALUE(remainder)); + EXPECT_EQ(front, ""); + EXPECT_EQ(remainder, ",q3r1234;poiu"); + it ++; + front = *it; + remainder = it.remainder(); + LOG_DEBUG(VALUE(front), VALUE(remainder)); + EXPECT_EQ(front, ""); + EXPECT_EQ(remainder, "q3r1234;poiu"); + it ++; + front = *it; + remainder = it.remainder(); + LOG_DEBUG(VALUE(front), VALUE(remainder)); + EXPECT_EQ(front, "q3r1234"); + EXPECT_EQ(remainder, "poiu"); + + a.clear(); + for (auto x: sp) + { + a.push_back(x); + LOG_DEBUG(x); + } + + EXPECT_EQ(a.size(), 6); + EXPECT_EQ(a[0], "alskdjf"); + EXPECT_EQ(a[4], "q3r1234"); + EXPECT_EQ(a[5], "poiu"); + + auto sv = s;//.view(); EXPECT_TRUE(sv.starts_with("alskdjf")); EXPECT_FALSE(sv.starts_with("alsk32")); @@ -872,9 +917,9 @@ TEST(estring, test) auto ps = estring::snprintf("%d%d%d", 2, 3, 4); EXPECT_EQ(ps, "234"); - estring as = " \tasdf \t\r\n"; + estring as = " \tasdf中文 \t\r\n"; auto trimmed = as.trim(); - EXPECT_EQ(trimmed, "asdf"); + EXPECT_EQ(trimmed, "asdf中文"); EXPECT_EQ(estring_view("234423").to_uint64(), 234423); EXPECT_EQ(estring_view("-234423").to_int64(), -234423); diff --git a/common/test/test_objcache.cpp b/common/test/test_objcache.cpp index 15919264b..f62cab6f6 100644 --- a/common/test/test_objcache.cpp +++ b/common/test/test_objcache.cpp @@ -22,6 +22,7 @@ limitations under the License. #undef private #undef protected + #include #include #include diff --git a/common/test/test_throttle.cpp b/common/test/test_throttle.cpp index bea7f14c0..acdc91475 100644 --- a/common/test/test_throttle.cpp +++ b/common/test/test_throttle.cpp @@ -205,14 +205,21 @@ TEST_P(FindAppropriateSliceNumTest, run) { t.consume(bs_per_io); bytes += bs_per_io; } - auto goal = bw * 10; + auto goal = bw * test_time_sec; auto diff = int64_t(bytes) - int64_t(goal); auto loss = double(std::abs(diff)) / double(goal); - LOG_INFO("Consume ` bytes in 10 seconds, loss ratio `", bytes, loss); + LOG_INFO("Consume ` bytes in ` seconds, loss ratio `", bytes, test_time_sec, loss); GTEST_ASSERT_LE(loss, p.performance_loss_max_ratio); } -INSTANTIATE_TEST_CASE_P(Throttle, FindAppropriateSliceNumTest, testing::Values( +#ifdef INSTANTIATE_TEST_SUITE_P +#define INSTANTIATE_TEST_P INSTANTIATE_TEST_SUITE_P +#else +#define INSTANTIATE_TEST_P INSTANTIATE_TEST_CASE_P +#endif + +INSTANTIATE_TEST_P(Throttle, + FindAppropriateSliceNumTest, testing::Values( FindAppropriateSliceNumSuite{10, 0.01}, FindAppropriateSliceNumSuite{50, 0.02}, FindAppropriateSliceNumSuite{100, 0.03}, @@ -243,9 +250,11 @@ struct PriorityTestSuite { }; class ThrottlePriorityTest : public testing::TestWithParam { +protected: + std::atomic running_{true}; }; -INSTANTIATE_TEST_CASE_P(Throttle, ThrottlePriorityTest, testing::Values( +INSTANTIATE_TEST_P(Throttle, ThrottlePriorityTest, testing::Values( PriorityTestSuite{ // 0. Simulate same priority and equally divide the BW PriorityTestSuite::Simulate, @@ -313,9 +322,9 @@ INSTANTIATE_TEST_CASE_P(Throttle, ThrottlePriorityTest, testing::Values( PriorityTestSuite{ // 7. Real socket. Low priority gets the rest BW that high priority doesn't need PriorityTestSuite::RealSocket, - 10'000'000, - {5'000'000, 10'000, photon::throttle::Priority::High}, - {100'000'000, 4'000'000, photon::throttle::Priority::Low}, + 100'000'000, + {50'000'000, 10'000, photon::throttle::Priority::High}, + {1'000'000'000, 4'000'000, photon::throttle::Priority::Low}, 0.4, 0.6, 0.4, 0.6, } @@ -323,9 +332,11 @@ INSTANTIATE_TEST_CASE_P(Throttle, ThrottlePriorityTest, testing::Values( static void run_real_socket(const std::atomic& running, const PriorityTestSuite& p, uint64_t& bw1, uint64_t& bw2) { + photon::semaphore stream_counter; photon::throttle t(p.limit_bw); uint64_t buf_size = std::max(p.io1.bs, p.io2.bs); auto server = photon::net::new_tcp_socket_server(); + ASSERT_NE(nullptr, server); DEFER(delete server); auto handler = [&](photon::net::ISocketStream* sock) -> int { @@ -335,24 +346,31 @@ static void run_real_socket(const std::atomic& running, const PriorityTest if (ret <= 0) break; photon::thread_yield(); } + stream_counter.signal(1); return 0; }; - server->setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); + + int ret = server->setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); + ASSERT_EQ(0, ret); server->set_handler(handler); - server->bind_v4any(0); - server->listen(); - server->start_loop(false); + ret = server->bind_v4any(0); + ASSERT_EQ(0, ret); + ret = server->listen(); + ASSERT_EQ(0, ret); + ret = server->start_loop(false); + ASSERT_EQ(0, ret); - photon::semaphore sem; auto server_ep = server->getsockname(); auto cli = photon::net::new_tcp_socket_client(); + ASSERT_NE(nullptr, cli); DEFER(delete cli); - photon::thread_create11([&] { + auto client_th1 = photon::thread_create11([&] { photon::throttle src(p.io1.bw); auto conn = cli->connect(server_ep); + if (!conn) exit(1); DEFER(delete conn); - char buf[buf_size]; + char buf[buf_size] = {}; while (running) { src.consume(p.io1.bs); ssize_t ret = conn->send(buf, p.io1.bs); @@ -360,13 +378,15 @@ static void run_real_socket(const std::atomic& running, const PriorityTest bw1 += p.io1.bs; t.consume(p.io1.bs, p.io1.prio); } - sem.signal(1); }); - photon::thread_create11([&] { + thread_enable_join(client_th1); + + auto client_th2 = photon::thread_create11([&] { photon::throttle src(p.io2.bw); auto conn = cli->connect(server_ep); + if (!conn) exit(1); DEFER(delete conn); - char buf[buf_size]; + char buf[buf_size] = {}; while (running) { src.consume(p.io2.bs); ssize_t ret = conn->send(buf, p.io2.bs); @@ -374,9 +394,13 @@ static void run_real_socket(const std::atomic& running, const PriorityTest bw2 += p.io2.bs; t.consume(p.io2.bs, p.io2.prio); } - sem.signal(1); }); - sem.wait(2); + thread_enable_join(client_th2); + + photon::thread_join((photon::join_handle*) client_th1); + photon::thread_join((photon::join_handle*) client_th2); + + stream_counter.wait(2); } static void run_simulate(const std::atomic& running, const PriorityTestSuite& p, @@ -409,26 +433,27 @@ TEST_P(ThrottlePriorityTest, run) { const uint64_t test_time_sec = 10; uint64_t bw1 = 0, bw2 = 0; - std::atomic running{true}; - std::thread([&] { + std::thread watcher([&] { ::sleep(test_time_sec); - running = false; - }).detach(); + running_ = false; + }); if (p.type == PriorityTestSuite::Simulate) - run_simulate(running, p, bw1, bw2); + run_simulate(running_, p, bw1, bw2); else if (p.type == PriorityTestSuite::RealSocket) - run_real_socket(running, p, bw1, bw2); + run_real_socket(running_, p, bw1, bw2); bw1 /= test_time_sec; bw2 /= test_time_sec; double ratio1 = double(bw1) / double(p.limit_bw); double ratio2 = double(bw2) / double(p.limit_bw); LOG_INFO(VALUE(bw1), VALUE(bw2), VALUE(ratio1), VALUE(ratio2)); - GTEST_ASSERT_GE(ratio1, p.bw1_ratio_min); - GTEST_ASSERT_LE(ratio1, p.bw1_ratio_max); - GTEST_ASSERT_GE(ratio2, p.bw2_ratio_min); - GTEST_ASSERT_LE(ratio2, p.bw2_ratio_max); + EXPECT_GE(ratio1, p.bw1_ratio_min); + EXPECT_LE(ratio1, p.bw1_ratio_max); + EXPECT_GE(ratio2, p.bw2_ratio_min); + EXPECT_LE(ratio2, p.bw2_ratio_max); + + watcher.join(); } #endif diff --git a/common/timeout.h b/common/timeout.h index 2e6e07ad0..c28face6b 100644 --- a/common/timeout.h +++ b/common/timeout.h @@ -15,9 +15,19 @@ limitations under the License. */ #pragma once -#include #include #include +#ifdef private +#define _PHOTON_UNIT_TEST +#undef private +#undef protected +#endif +#include +#ifdef _PHOTON_UNIT_TEST +#undef _PHOTON_UNIT_TEST +#define private public +#define protected public +#endif namespace photon { diff --git a/common/utility.h b/common/utility.h index 02196bacb..89567eed7 100644 --- a/common/utility.h +++ b/common/utility.h @@ -286,8 +286,13 @@ class OwnedPtr : OwnedPtr_Base } */ +#ifndef likely constexpr bool likely(bool expr) { return __builtin_expect(expr, true); } +#endif /* likely */ + +#ifndef unlikely constexpr bool unlikely(bool expr) { return __builtin_expect(expr, false); } +#endif /* unlikely */ int version_compare(std::string_view a, std::string_view b, int& result); int kernel_version_compare(std::string_view dst, int& result); diff --git a/doc/docs/introduction/how-to-build.md b/doc/docs/introduction/how-to-build.md index 0af88027a..e65d7254f 100644 --- a/doc/docs/introduction/how-to-build.md +++ b/doc/docs/introduction/how-to-build.md @@ -107,7 +107,7 @@ The examples and test code are built together. # Install additional dependencies dnf install epel-release dnf config-manager --set-enabled powertools -dnf install gtest-devel gmock-devel gflags-devel fuse-devel libgsasl-devel +dnf install gtest-devel gmock-devel gflags-devel fuse-devel libgsasl-devel nasm # Build examples and test code cmake -B build -D PHOTON_BUILD_TESTING=ON @@ -125,7 +125,7 @@ ctest ```bash # Install additional dependencies -apt install libgtest-dev libgmock-dev libgflags-dev libfuse-dev libgsasl7-dev +apt install libgtest-dev libgmock-dev libgflags-dev libfuse-dev libgsasl7-dev nasm # Build examples and test code cmake -B build -D PHOTON_BUILD_TESTING=ON diff --git a/ecosystem/CMakeLists.txt b/ecosystem/CMakeLists.txt index 6ab41bfb5..ac7aa4b34 100644 --- a/ecosystem/CMakeLists.txt +++ b/ecosystem/CMakeLists.txt @@ -3,7 +3,7 @@ include(FetchContent) -if(CMAKE_VERSION VERSION_GREATER_EQUAL "3.24.0") +if(CMAKE_VERSION VERSION_GREATER_EQUAL "3.24.0") # set DOWNLOAD_EXTRACT_TIMESTAMP ON cmake 3.24 or above cmake_policy(SET CMP0135 NEW) endif() @@ -11,6 +11,7 @@ endif() # Rapidjson FetchContent_Declare( rapidjson + GIT_SUBMODULES "" GIT_REPOSITORY ${PHOTON_RAPIDJSON_GIT} GIT_TAG v1.1.0 PATCH_COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/patches/rapidjson.patch @@ -25,9 +26,10 @@ message(STATUS "Rapidjson source dir: ${rapidjson_SOURCE_DIR}") # RapidXml FetchContent_Declare( rapidxml - URL ${PHOTON_RAPIDXML_SOURCE} - URL_HASH - SHA256=c3f0b886374981bb20fabcf323d755db4be6dba42064599481da64a85f5b3571 + GIT_SUBMODULES "" + GIT_REPOSITORY ${PHOTON_RAPIDXML_SOURCE} + GIT_TAG main + PATCH_COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/patches/rapidxml.patch UPDATE_DISCONNECTED 1) FetchContent_MakeAvailable(rapidxml) message(STATUS "Rapidxml source dir: ${rapidxml_SOURCE_DIR}") diff --git a/ecosystem/patches/rapidjson.patch b/ecosystem/patches/rapidjson.patch index 8abab12e3..a261dda5f 100644 --- a/ecosystem/patches/rapidjson.patch +++ b/ecosystem/patches/rapidjson.patch @@ -9,7 +9,7 @@ index 19f8849b..618492a4 100644 + kParseBoolsAsStringFlag = 512, //!< Parse all booleans (true/false) as strings. kParseDefaultFlags = RAPIDJSON_PARSE_DEFAULT_FLAGS //!< Default parse flags. Can be customized by defining RAPIDJSON_PARSE_DEFAULT_FLAGS }; - + @@ -201,6 +202,8 @@ struct BaseReaderHandler { bool Default() { return true; } bool Null() { return static_cast(*this).Default(); } @@ -22,7 +22,7 @@ index 19f8849b..618492a4 100644 @@ -714,13 +717,22 @@ private: RAPIDJSON_PARSE_ERROR(kParseErrorValueInvalid, is.Tell()); } - + + template + void ParseRawBools(InputStream& is, Handler& handler) { + @@ -33,7 +33,7 @@ index 19f8849b..618492a4 100644 RAPIDJSON_ASSERT(is.Peek() == 't'); + auto begin = is.PutBegin(); is.Take(); - + if (RAPIDJSON_LIKELY(Consume(is, 'r') && Consume(is, 'u') && Consume(is, 'e'))) { - if (RAPIDJSON_UNLIKELY(!handler.Bool(true))) + auto copy = !(parseFlags & kParseInsituFlag); @@ -49,7 +49,7 @@ index 19f8849b..618492a4 100644 RAPIDJSON_ASSERT(is.Peek() == 'f'); + auto begin = is.PutBegin(); is.Take(); - + if (RAPIDJSON_LIKELY(Consume(is, 'a') && Consume(is, 'l') && Consume(is, 's') && Consume(is, 'e'))) { - if (RAPIDJSON_UNLIKELY(!handler.Bool(false))) + auto copy = !(parseFlags & kParseInsituFlag); @@ -59,3 +59,18 @@ index 19f8849b..618492a4 100644 RAPIDJSON_PARSE_ERROR(kParseErrorTermination, is.Tell()); } else +diff --git a/include/rapidjson/stream.h b/include/rapidjson/stream.h +index fef82c25..cd51ccd3 100644 +--- a/include/rapidjson/stream.h ++++ b/include/rapidjson/stream.h +@@ -147,8 +147,8 @@ struct GenericInsituStringStream { + GenericInsituStringStream(Ch *src) : src_(src), dst_(0), head_(src) {} + + // Read +- Ch Peek() { return *src_; } +- Ch Take() { return *src_++; } ++ Ch Peek() { return *src_ ? *src_ : '}'; } ++ Ch Take() { return *src_ ? *src_++ : '}'; } + size_t Tell() { return static_cast(src_ - head_); } + + // Write diff --git a/ecosystem/patches/rapidxml.patch b/ecosystem/patches/rapidxml.patch new file mode 100644 index 000000000..13e35f9c0 --- /dev/null +++ b/ecosystem/patches/rapidxml.patch @@ -0,0 +1,11 @@ +--- rapidxml.hpp ++++ rapidxml.hpp +@@ -2205,6 +2205,8 @@ + } + // Skip remaining whitespace after node name + skip(text); ++ if (*text == Ch('\0')) ++ return; // treat it as '>' without increament of text + if (*text != Ch('>')) + RAPIDXML_PARSE_ERROR("expected >", text); + ++text; // Skip '>' diff --git a/ecosystem/simple_dom.cpp b/ecosystem/simple_dom.cpp index 1bef4d7a9..463e7968a 100644 --- a/ecosystem/simple_dom.cpp +++ b/ecosystem/simple_dom.cpp @@ -7,9 +7,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -102,16 +104,43 @@ class DocNode : public NodeImpl { LOG_DEBUG(VALUE(depth), k, ':', v); } } - void set_children(vector&& nodes, bool _sort = true) { + struct Compare { + const DocNode* _this; + bool operator()(uint32_t i, uint32_t j) const { + auto si = _this->_children[i].get_key(); + auto sj = _this->_children[j].get_key(); + return si < sj || (si == sj && i < j); + } + bool operator()(str s, uint32_t j) const { + return s < _this->_children[j].get_key(); + } + bool operator()(uint32_t i, str s) const { + return _this->_children[i].get_key() < s; + } + }; + std::unique_ptr _index; + void set_children(vector&& nodes, bool _indexing = true) { if (nodes.empty()) return; assert(nodes.size() <= MAX_NCHILDREN); if (nodes.size() > MAX_NCHILDREN) nodes.resize(MAX_NCHILDREN); - if (_sort) - sort(nodes.begin(), nodes.end()); - nodes.back()._flags |= FLAG_IS_LAST; // must be after sort!!! - _nchildren = nodes.size(); _children = std::move(nodes); + _nchildren = _children.size(); + if (_indexing) { + _index.reset(new uint32_t[_nchildren]); + for (size_t i = 0; i < _nchildren; ++i) _index[i] = i; + std::sort(_index.get(), _index.get() + _nchildren, Compare{this}); + Derived *a, *b = &_children[_index[0]]; + for (size_t i = 1; i < _nchildren; ++i) { + a = b; b = &_children[_index[i]]; + if (a->get_key() != b->get_key()) + a->_flags |= FLAG_EQUAL_KEY_LAST; + } + b->_flags |= FLAG_EQUAL_KEY_LAST; + } + // user-side has no idea about # of children, + // so we use this flag to indicate ending + _children.back()._flags |= FLAG_IS_LAST; } ~DocNode() override { if (is_root()) { @@ -121,16 +150,20 @@ class DocNode : public NodeImpl { } } const NodeImpl* get(size_t i) const override { - return (i < _children.size()) ? &_children[i] : nullptr; + return (i < _nchildren) ? &_children[i] : nullptr; } const NodeImpl* get(str key) const override { if (_children.empty()) return nullptr; - for (size_t i = 0; i < _children.size() - 1; ++i) { + for (size_t i = 0; i < _nchildren - 1U; ++i) { assert((_children[i]._flags & FLAG_IS_LAST) == 0); } assert(_children.back()._flags & FLAG_IS_LAST); - auto it = std::lower_bound(_children.begin(), _children.end(), key); - return (it == _children.end() || it->get_key() != key) ? nullptr : &*it; + if (!_index) return nullptr; + auto end = _index.get() + _nchildren; + auto it = std::lower_bound(_index.get(), end, key, Compare{this}); + if (it == end || *it >= _nchildren || key != _children[*it].get_key()) + return nullptr; + return &_children[*it]; } }; @@ -144,26 +177,29 @@ struct JHandler : public BaseReaderHandler, JHandler> { vector> _nodes{1}; str _key; JNode* _root; - JHandler(const char* text, bool text_ownership) { + JHandler(const char* text) { assert(_nodes.size() == 1); - _root = new JNode(text, text_ownership); + _root = new JNode(text, false); } ~JHandler() { + delete _root; + } + JNode* get_root() { assert(_nodes.size() == 1); assert(_nodes.front().size() == 1); _root->set_children(std::move(_nodes.front().front()._children)); - } - JNode* get_root() { + DEFER(_root = nullptr); return _root; } - void emplace_back(const char* s, size_t length) { + void emplace_back(const char* s, size_t length, uint8_t type) { str val{s, length}; // _key may be empty() _nodes.back().emplace_back(_key, val, _root); + _nodes.back().back().set_type(type); // LOG_DEBUG(_key, ": ", val); _key = {}; } bool Null() { - emplace_back(0, 0); + emplace_back(0, 0, JNode::null); return true; } bool Key(const char* s, SizeType len, bool copy) { @@ -173,22 +209,22 @@ struct JHandler : public BaseReaderHandler, JHandler> { } bool String(const char* s, SizeType len, bool copy) { assert(!copy); - emplace_back(s, len); + emplace_back(s, len, JNode::STRING); return true; } bool RawNumber(const Ch* s, SizeType len, bool copy) { assert(!copy); // LOG_DEBUG(ALogString(s, len)); - emplace_back(s, len); + emplace_back(s, len, JNode::NUMBER); return true; } bool RawBool(const Ch* s, SizeType len, bool copy) { assert(!copy); - emplace_back(s, len); + emplace_back(s, len, JNode::BOOLEAN); return true; } bool StartObject() { - emplace_back(0, 0); + emplace_back(0, 0, JNode::OBJECT); _nodes.emplace_back(); return true; } @@ -196,16 +232,16 @@ struct JHandler : public BaseReaderHandler, JHandler> { commit(true); return true; } - void commit(bool sort) { + void commit(bool _indexing) { assert(_nodes.size() > 1); auto temp = std::move(_nodes.back()); _nodes.pop_back(); assert(_nodes.back().size() > 0); - // LOG_DEBUG(temp.size(), " elements to ", _nodes.back().back().get_key(), " sort=", sort); - _nodes.back().back().set_children(std::move(temp), sort); + // LOG_DEBUG(temp.size(), " elements to ", _nodes.back().back().get_key(), VALUE(_indexing)); + _nodes.back().back().set_children(std::move(temp), _indexing); } bool StartArray() { - emplace_back(0, 0); + emplace_back(0, 0, JNode::ARRAY); _nodes.emplace_back(); return true; } @@ -215,15 +251,52 @@ struct JHandler : public BaseReaderHandler, JHandler> { } }; +// As some parsers don't support text length, they only support null +// terminated strings, so we have to convert the last trailer to '\0', +// while making the parser to treat it as the trailer. +static bool fix_trail(estring_view text, char header, char trailer) { + auto i = text.rfind(trailer); + if (i == estring_view::npos) + LOG_ERROR_RETURN(EINVAL, false, "no trailer found"); + for (char c: text.substr(i+1)) + if (!isspace(c)) + LOG_ERROR_RETURN(EINVAL, false, "illegal character after trailer"); + size_t k = 0; + for (char c: text.substr(0, i+1)) { + if (!c) + LOG_ERROR_RETURN(EINVAL, false, "unexpected '\\0'"); + if (c == header) ++k; + else if (c == trailer) + if (k-- == 0) goto failed; + } + if (k != 0) failed: + LOG_ERROR_RETURN(EINVAL, false, "unbalanced header/trailer"); + auto c = &text[i]; + *(char*)c = '\0'; + return true; +} + static NodeImpl* parse_json(char* text, size_t size, int flags) { + // without kParseStopWhenDoneFlag, rapidjson will expect the last character of + // the JSON string to be '\0'. const auto kFlags = kParseNumbersAsStringsFlag | kParseBoolsAsStringFlag | - kParseInsituFlag | kParseCommentsFlag | kParseTrailingCommasFlag; - JHandler h(text, flags & DOC_FREE_TEXT_ON_DESTRUCTION); + kParseInsituFlag | kParseCommentsFlag | + kParseTrailingCommasFlag | kParseStopWhenDoneFlag; + if (!fix_trail({text, size}, '{', '}')) return nullptr; + JHandler h(text); using Encoding = UTF8<>; GenericInsituStringStream s(text); GenericReader reader; - reader.Parse(s, h); - return h.get_root(); + auto res = reader.Parse(s, h); + if (!res) { + LOG_ERROR("json parsing failed with error ` at `, origin size `", + res.Code(), res.Offset(), size); + return nullptr; + } + auto root = h.get_root(); + if (flags & DOC_FREE_TEXT_ON_DESTRUCTION) + root->_flags |= NodeImpl::FLAG_TEXT_OWNERSHIP; + return root; } using namespace rapidxml; @@ -266,12 +339,13 @@ class XMLNode : public DocNode { }; static NodeImpl* parse_xml(char* text, size_t size, int flags) { + if (!fix_trail({text, size}, '<', '>')) return nullptr; xml_document doc; doc.parse<0>(text); - auto root = new XMLNode(text, flags & DOC_FREE_TEXT_ON_DESTRUCTION); + auto root = make_unique(text, flags & DOC_FREE_TEXT_ON_DESTRUCTION); assert(root); root->build(&doc); - return root; + return root.release(); } class YAMLNode : public DocNode { @@ -296,15 +370,95 @@ class YAMLNode : public DocNode { }; static NodeImpl* parse_yaml(char* text, size_t size, int flags) { + ryml::s_default_callbacks.m_error = [](const char* msg, size_t msg_len, + ryml::Location location, + void* user_data) { + LOG_ERROR("yaml parsing failed with `", std::string_view(msg, msg_len)); + throw std::runtime_error("yaml parsing failed"); + }; auto yaml = ryml::parse_in_place({text, size}); - auto root = new YAMLNode(text, flags & DOC_FREE_TEXT_ON_DESTRUCTION); + auto root = make_unique(text, flags & DOC_FREE_TEXT_ON_DESTRUCTION); assert(root); root->build(yaml.rootref()); - return root; + return root.release(); +} + +class IniNode : public DocNode { +public: + using DocNode::DocNode; +}; + +using ini_handler = void (*)(void*, estring_view, estring_view, estring_view); +static int do_parse_ini(estring_view text, ini_handler h, void* user) { + int err_cnt = 0; + estring_view section; + for (auto line: text.split_lines()) { + auto comment = line.rfind(" ;"); + if (comment < line.size()) + line = line.substr(0, comment); + line = line.trim(); + if (line.empty() || line[0] == '#' || line[0] == ';') continue; + if (line[0] == '[') { + if (line.back() == ']') { + section = line.substr(1, line.size() - 2).trim(); + } else { + err_cnt++; + LOG_DEBUG("section with no ending: ", line); + } + } else { + auto eq = line.find_first_of(charset("=:")); + if (eq > 0 && eq < line.size() - 1) { + auto key = line.substr(0, eq).trim(); + auto val = line.substr(eq + 1).trim(); + h(user, section, key, val); + } else { + err_cnt++; + LOG_DEBUG("ill formed kv: ", line); + } + } + } + return -err_cnt; } static NodeImpl* parse_ini(char* text, size_t size, int flags) { - return nullptr; + struct Item { + estring_view section, key, val; + bool operator < (const Item& rhs) const { + return section < rhs.section; + } + }; + vector ctx; + auto handler = [](void* user, estring_view section, + estring_view key, estring_view val) { + auto ctx = (vector*)user; + ctx->push_back({section, key, val}); + LOG_DEBUG(VALUE(section), VALUE(key), VALUE(val)); + }; + int ret = do_parse_ini({text, size}, handler, &ctx); + if (ret < 0 && ctx.empty()) + LOG_ERROR_RETURN(-1, nullptr, "ini_parse_string_length() failed: ", ret); + + sort(ctx.begin(), ctx.end()); + vector sections, nodes; + estring_view prev_sect; + auto root = make_unique(text, flags & DOC_FREE_TEXT_ON_DESTRUCTION); + for (auto& x : ctx) { + if (prev_sect != x.section) { + prev_sect = x.section; + if (!nodes.empty() && !sections.empty()) { + sections.back().set_children(std::move(nodes)); + assert(nodes.empty()); + } + sections.emplace_back(x.section, str{}, root.get()); + } + nodes.emplace_back(x.key, x.val, root.get()); + } + if (!sections.empty()) { + if (!nodes.empty()) + sections.back().set_children(std::move(nodes)); + root->set_children(std::move(sections)); + } + return root.release(); } Node parse(char* text, size_t size, int flags) { @@ -314,11 +468,15 @@ Node parse(char* text, size_t size, int flags) { constexpr static Parser parsers[] = {&parse_json, &parse_xml, &parse_yaml, &parse_ini}; auto i = flags & DOC_TYPE_MASK; - if ((size_t) i > LEN(parsers)) { + if ((size_t) i >= LEN(parsers)) { if (flags & DOC_FREE_TEXT_IF_PARSING_FAILED) free(text); LOG_ERROR_RETURN(EINVAL, nullptr, "invalid document type ", HEX(i)); } - return parsers[i](text, size, flags); + NodeImpl* r = nullptr; + try { r = parsers[i](text, size, flags); } + catch(...) { LOG_ERROR("parsing failed and exception caught"); } + if (!r && (flags & DOC_FREE_TEXT_IF_PARSING_FAILED)) free(text); + return r; } Node parse_file(fs::IFile* file, int flags) { diff --git a/ecosystem/simple_dom.h b/ecosystem/simple_dom.h index e3ce65525..37236b043 100644 --- a/ecosystem/simple_dom.h +++ b/ecosystem/simple_dom.h @@ -80,6 +80,8 @@ class Node { const char* text_begin() const { IF_RET(_impl->get_root()->_text_begin); } str key(const char* b) const { IF_RET(_impl->get_key(b)); } str value(const char* b) const { IF_RET(_impl->get_value(b)); } + uint8_t type() const { IF_RET(_impl->get_type()); } + uint8_t get_type() const { IF_RET(_impl->get_type()); } bool valid() const { return _impl; } operator bool() const { return _impl; } size_t num_children() const { IF_RET(_impl->num_children()); } @@ -89,14 +91,20 @@ class Node { Node operator[](const char* key) const { return get(key); } Node operator[](size_t i) const { return get(i); } Node get_attributes() const { return get("__attributes__"); } - str to_string() const { return value(); } + str to_string_view() const { return value(); } #undef IF_RET - int64_t to_integer(int64_t def_val = 0) const { + int64_t to_int64_t(int64_t def_val = 0) const { return value().to_int64(def_val); } - double to_number(double def_val = NAN) const { + double to_double(double def_val = NAN) const { return value().to_double(def_val); } + bool to_bool() const { + assert(type() == TYPE::BOOLEAN); + auto v = value(); + return v.size() && (v[0] == 't' || v[0] == 'T'); + } + using TYPE = NodeImpl::TYPE; bool operator==(str rhs) const { return value() == rhs; } bool operator!=(str rhs) const { return value() != rhs; } @@ -105,19 +113,19 @@ class Node { bool operator>=(str rhs) const { return value() >= rhs; } bool operator> (str rhs) const { return value() > rhs; } - bool operator==(int64_t rhs) const { return to_integer() == rhs; } - bool operator!=(int64_t rhs) const { return to_integer() != rhs; } - bool operator<=(int64_t rhs) const { return to_integer() <= rhs; } - bool operator< (int64_t rhs) const { return to_integer() < rhs; } - bool operator>=(int64_t rhs) const { return to_integer() >= rhs; } - bool operator> (int64_t rhs) const { return to_integer() > rhs; } + bool operator==(int64_t rhs) const { return to_int64_t() == rhs; } + bool operator!=(int64_t rhs) const { return to_int64_t() != rhs; } + bool operator<=(int64_t rhs) const { return to_int64_t() <= rhs; } + bool operator< (int64_t rhs) const { return to_int64_t() < rhs; } + bool operator>=(int64_t rhs) const { return to_int64_t() >= rhs; } + bool operator> (int64_t rhs) const { return to_int64_t() > rhs; } - bool operator==(double rhs) const { return to_number() == rhs; } - bool operator!=(double rhs) const { return to_number() != rhs; } - bool operator<=(double rhs) const { return to_number() <= rhs; } - bool operator< (double rhs) const { return to_number() < rhs; } - bool operator>=(double rhs) const { return to_number() >= rhs; } - bool operator> (double rhs) const { return to_number() > rhs; } + bool operator==(double rhs) const { return to_double() == rhs; } + bool operator!=(double rhs) const { return to_double() != rhs; } + bool operator<=(double rhs) const { return to_double() <= rhs; } + bool operator< (double rhs) const { return to_double() < rhs; } + bool operator>=(double rhs) const { return to_double() >= rhs; } + bool operator> (double rhs) const { return to_double() > rhs; } struct SameKeyEnumerator; auto enumerable_same_key_siblings() const -> @@ -140,7 +148,8 @@ const int DOC_TYPE_MASK = 0xff; const int DOC_FREE_TEXT_IF_PARSING_FAILED = 0x100; const int DOC_FREE_TEXT_ON_DESTRUCTION = 0x200; -const int DOC_OWN_TEXT = 0x300; +const int DOC_OWN_TEXT = DOC_FREE_TEXT_IF_PARSING_FAILED | + DOC_FREE_TEXT_ON_DESTRUCTION; using Document = Node; @@ -153,10 +162,12 @@ Node parse(char* text, size_t size, int flags); inline Node parse(IStream::ReadAll&& buf, int flags) { if (!buf.ptr || buf.size <= 0) return nullptr; auto node = parse((char*)buf.ptr.get(), (size_t)buf.size, flags); - if (node || (flags & DOC_FREE_TEXT_IF_PARSING_FAILED)) { + if (node) { + buf.ptr.release(); + } else if (flags & DOC_FREE_TEXT_IF_PARSING_FAILED) { buf.ptr.reset(); - buf.size = 0; } + buf.size = 0; return node; } @@ -196,26 +207,19 @@ inline auto Node::enumerable_children() const -> } struct Node::SameKeyEnumerator : public Node::ChildrenEnumerator { - const char* _base; - str _key; + const char* _base = nullptr; SameKeyEnumerator(const NodeImpl* node) { - _impl = node; - if (node) { + if ((_impl = node)) { _base = node->get_root()->_text_begin; - _key = node->get_key(_base); - } else { - _base = nullptr; - assert(_key.empty()); } } int next() { - if (!valid()) return -1; - _impl = _impl->next_sibling(); - if (!valid()) return -1; - if (_impl->get_key(_base) != _key) { - _impl = nullptr; - return -1; - } + if (!valid() || (_impl->_flags & NodeImpl::FLAG_EQUAL_KEY_LAST)) return -1; + auto key = _impl->get_key(_base); + do { + _impl = _impl->next_sibling(); + if (!valid()) return -1; + } while (key != _impl->get_key(_base)); return 0; } }; diff --git a/ecosystem/simple_dom_impl.h b/ecosystem/simple_dom_impl.h index d9926d942..8f96275f0 100644 --- a/ecosystem/simple_dom_impl.h +++ b/ecosystem/simple_dom_impl.h @@ -37,9 +37,10 @@ using str = estring_view; class NodeImpl : public Object { protected: NodeImpl() = default; - const static uint8_t FLAG_IS_ROOT = 1; - const static uint8_t FLAG_IS_LAST = 2; - const static uint8_t FLAG_TEXT_OWNERSHIP= 4; + const static uint8_t FLAG_IS_ROOT = 1; // the root node + const static uint8_t FLAG_IS_LAST = 2; // last sibling node + const static uint8_t FLAG_EQUAL_KEY_LAST= 4; // last node with same key + const static uint8_t FLAG_TEXT_OWNERSHIP= 8; const static size_t MAX_NODE_SIZE = 256; const static size_t MAX_NCHILDREN = UINT16_MAX; const static size_t MAX_KEY_OFFSET = UINT32_MAX; @@ -47,25 +48,30 @@ class NodeImpl : public Object { const static size_t MAX_VALUE_OFFSET = 4095; const static size_t MAX_VALUE_LENGTH = MAX_KEY_OFFSET; + enum TYPE : uint8_t {STRING, NUMBER, null, BOOLEAN, OBJECT, ARRAY}; + +union { + const char* _text_begin; // addr of the text (only for root node) + const NodeImpl* _root; // root node (only for non-root nodes) +}; union { struct { // for non-root nodes - struct { - uint8_t _flags; - uint16_t _k_len : 12; // key length (12 bits) - uint16_t _v_off : 12; // value offset (12 bits) to key end - }__attribute__((packed)); - uint32_t _k_off; // key offset to _text_begin - const NodeImpl* _root; // root node - uint32_t _v_len; // value length -} ; // packed as 20 bytes + uint8_t _flags; + uint16_t _k_len : 12; // key length (12 bits) + uint16_t _v_off : 12; // value offset (12 bits) to key end +}__attribute__((packed)); struct { // for the root node uint8_t _flags_; // the same as _flags uint8_t _node_size; // sizeof(the node implementation) mutable uint16_t _refcnt; // reference counter of the document - uint32_t _k_off_; - const char* _text_begin; - uint32_t _v_len_; }; }; - uint16_t _nchildren; // for all nodes + uint32_t _k_off; // key offset to _text_begin + uint32_t _v_len; // value length + uint32_t _nchildren; // for all nodes + + void set_type(uint8_t type) { + _flags &= (1 << 5) - 1; + _flags |= (type & 0x7) << 5; + } using AT16 = std::atomic; static_assert(sizeof(AT16) == sizeof(_refcnt), "..."); @@ -103,14 +109,17 @@ struct { // for the root node bool is_root() const { return _flags & FLAG_IS_ROOT; } + uint8_t get_type() const { + return _flags >> 5; + } const NodeImpl* get_root() const { return is_root() ? this : _root; } str get_key() const { - return {get_root()->_text_begin + _k_off, _k_len}; + return get_key(get_root()->_text_begin); } str get_value() const { - return {get_key().end() + _v_off, _v_len}; + return get_value(get_root()->_text_begin); } str get_key(const char* text_begin) const { return {text_begin + _k_off, _k_len}; @@ -140,6 +149,7 @@ struct { // for the root node int init_non_root(str key, str value, const NodeImpl* root, uint32_t flags); }; +static_assert(sizeof(NodeImpl) == 32, ""); } } diff --git a/ecosystem/test/test_simple_dom.cpp b/ecosystem/test/test_simple_dom.cpp index 8216f3969..2737c7e84 100644 --- a/ecosystem/test/test_simple_dom.cpp +++ b/ecosystem/test/test_simple_dom.cpp @@ -30,7 +30,7 @@ using namespace std; using namespace photon::SimpleDOM; // OSS list response -const static char xml[] = R"( +static char xml[] = R"( examplebucket @@ -53,6 +53,7 @@ const static char xml[] = R"( 1305433xxx + asdf test100.txt 2020-05-26T07:50:20.000Z @@ -88,7 +89,7 @@ void print_all2(Node node) { static __attribute__((noinline)) int do_list_object(string_view prefix, ObjectList& result, string* marker) { - auto doc = parse_copy(xml, sizeof(xml), DOC_XML); + auto doc = parse(xml, sizeof(xml)-1, DOC_XML); EXPECT_TRUE(doc); auto list_bucket_result = doc["ListBucketResult"]; auto attr = list_bucket_result.get_attributes(); @@ -108,8 +109,8 @@ int do_list_object(string_view prefix, ObjectList& result, string* marker) { EXPECT_TRUE(key); auto size = child["Size"]; EXPECT_TRUE(size); - auto text = key.to_string(); - auto dsize = size.to_integer(); + auto text = key.to_string_view(); + auto dsize = size.to_int64_t(); LOG_DEBUG(VALUE(text), VALUE(dsize)); result.emplace_back(0, DT_REG, text.substr(prefix.size()), dsize, text.size() == prefix.size()); @@ -122,7 +123,7 @@ int do_list_object(string_view prefix, ObjectList& result, string* marker) { for (auto child: list_bucket_result.enumerable_children("CommonPrefixes")) { auto key = child["Prefix"]; EXPECT_TRUE(key); - auto dirname = key.to_string(); + auto dirname = key.to_string_view(); if (dirname.back() == '/') dirname.remove_suffix(1); // update_stat_cache(dirname, 0, OSS_DIR_MODE); dirname.remove_prefix(prefix.size()); @@ -130,7 +131,7 @@ int do_list_object(string_view prefix, ObjectList& result, string* marker) { } if (marker) { auto next_marker = list_bucket_result["NextMarker"]; - if (next_marker) *marker = next_marker.to_string(); + if (next_marker) *marker = next_marker.to_string_view(); else marker->clear(); } return 0; @@ -141,15 +142,9 @@ TEST(simple_dom, oss_list) { string marker; do_list_object("", list, &marker); static ObjectList truth = { - {0, DT_REG, "test100.txt", 1, false}, {0, DT_REG, "test10.txt", 1, false}, + {0, DT_REG, "test100.txt", 1, false}, }; - using T = decltype(truth[0]); - auto cmp = [](T& a, T& b) { - return std::get<2>(a) < std::get<2>(b); - }; - std::sort(truth.begin(), truth.end(), cmp); - std::sort(list.begin(), list.end(), cmp); EXPECT_EQ(list, truth); EXPECT_EQ(marker, "test100.txt"); } @@ -158,7 +153,7 @@ void expect_eq_kvs(Node node, const char * const * truth, size_t n) { for (size_t i = 0; i < n; ++i) { auto x = truth + i * 2; auto q = node[x[0]]; - LOG_DEBUG("expect node['`'] => '`' (got '`')", x[0], x[1], q.to_string()); + LOG_DEBUG("expect node['`'] => '`' (got `)", x[0], x[1], q.to_string_view()); EXPECT_EQ(q, x[1]); } } @@ -172,7 +167,7 @@ void expect_eq_vals(Node node, const char * const * truth, size_t n) { for (size_t i = 0; i < n; ++i) { auto x = truth[i]; auto q = node[i]; - LOG_DEBUG("expect node[`] => '`' (got '`')", i, x, q.to_string()); + LOG_DEBUG("expect node[`] => '`' (got '`')", i, x, q.to_string_view()); EXPECT_EQ(q, x); } } @@ -182,8 +177,20 @@ void expect_eq_vals(Node node, const char * const (&truth)[N]) { expect_eq_vals(node, truth, N); } +void expect_types(Node node, const std::pair* truth, size_t n) { + for (size_t i = 0; i < n; ++i) { + auto val = node[truth[i].first]; + EXPECT_EQ(val.type(), truth[i].second); + } +} + +template inline +void expect_types(Node node, const std::pair (&truth)[N]) { + return expect_types(node, truth, N); +} + TEST(simple_dom, json) { - const static char json0[] = R"({ + static char json0[] = R"({ "hello": "world", "t": true , "f": false, @@ -192,7 +199,7 @@ TEST(simple_dom, json) { "pi": 3.1416, "a": [1, 2, 3, 4], })"; - auto doc = parse_copy(json0, sizeof(json0), DOC_JSON); + auto doc = parse(json0, sizeof(json0)-1, DOC_JSON); EXPECT_TRUE(doc); expect_eq_kvs(doc, { {"hello", "world"}, @@ -201,14 +208,62 @@ TEST(simple_dom, json) { {"i", "-123"}, {"pi", "3.1416"}, }); - EXPECT_EQ(doc["i"].to_integer(), -123); - EXPECT_NEAR(doc["pi"].to_number(), 3.1416, 1e-5); + using TYPE = Node::TYPE; + expect_types(doc, {{"hello", TYPE::STRING}, {"t", TYPE::BOOLEAN}, + {"f", TYPE::BOOLEAN}, {"n", TYPE::null}, + {"i", TYPE::NUMBER}, {"pi", TYPE::NUMBER}, + {"a", TYPE::ARRAY}}); + EXPECT_EQ(doc["i"].to_int64_t(), -123); + EXPECT_NEAR(doc["pi"].to_double(), 3.1416, 1e-5); expect_eq_vals(doc["a"], {"1", "2", "3", "4"}); } +TEST(simple_dom, illegal_formats) { + auto ck_parse_copy = [](const std::vector& formats, int type) { + for (const auto& s : formats) { + auto doc = parse_copy((const char*)s.data(), s.size(), type); + EXPECT_FALSE(doc); + } + }; + auto ck_parse = [](std::vector& formats, int type) { + for (auto& s : formats) { + auto doc = parse((char*)s.data(), s.size(), type); + EXPECT_FALSE(doc); + } + }; + + std::vector xmls = {"xml1", "<>", "4"}; + ck_parse_copy(xmls, DOC_XML); + ck_parse(xmls, DOC_XML); + std::vector jsons = {"json1", "33{{json22}}22", "{3{json}}"}; + ck_parse_copy(jsons, DOC_JSON); + ck_parse(jsons, DOC_JSON); + std::vector yamls = {"[1,2,3,4,5", "{{a:1,b:}", "{yaml3}}"}; + ck_parse_copy(yamls, DOC_YAML); + ck_parse(yamls, DOC_YAML); + std::vector inis = {"ini1:", ":ini2", "ini3"}; + ck_parse_copy(inis, DOC_INI); + ck_parse(inis, DOC_INI); +} + +TEST(simple_dom, fix_trail) { + static char s0[] = R"({"a":"b"}})"; + static char s1[] = R"({"a":"b")"; + static char s2[] = R"({"a":"b"} )"; + static char s3[] = "{\"a\":\0\"b\"}"; + static std::pair json[] = { + {s0, false}, {s1, false}, {s2, true}, + {{s3, sizeof(s3) - 1}, false}, + }; + for (auto x: json) { + auto doc = parse((char*)x.first.data(), x.first.size(), DOC_JSON); + EXPECT_EQ(!!doc, x.second); + } +} + TEST(simple_dom, yaml0) { static char yaml0[] = "{foo: 1, bar: [2, 3], john: doe}"; - auto doc = parse(yaml0, sizeof(yaml0), DOC_YAML); + auto doc = parse(yaml0, sizeof(yaml0)-1, DOC_YAML); EXPECT_TRUE(doc); expect_eq_kvs(doc, {{"foo", "1"}, {"john", "doe"}}); expect_eq_vals(doc["bar"], {"2", "3"}); @@ -233,7 +288,7 @@ newmap: {} newmap (serialized): {} I am something: indeed )"; - auto doc = parse(yaml1, sizeof(yaml1), DOC_YAML); + auto doc = parse(yaml1, sizeof(yaml1)-1, DOC_YAML); EXPECT_TRUE(doc); expect_eq_kvs(doc, { {"foo", "says who"}, @@ -246,3 +301,93 @@ I am something: indeed expect_eq_vals(doc["bar"], {"20", "30", "oh so nice", "oh so nice (serialized)"}); } + +static char example_ini[] = R"( +[protocol] ; Protocol configuration +version=6 ; IPv6 + +[user] +name = Bob Smith ; Spaces around '=' are stripped +email = bob@smith.com ; And comments (like this) ignored +active = true ; Test a boolean +pi = 3.14159 ; Test a floating point number +trillion = 1000000000000 ; Test 64-bit integers + +[protocol] ; Protocol configuration + ver = 4 ; IPv4 + +[section1] +single1 = abc +single2 = xyz +[section3] +single: ghi +multi: the quick +name = bob smith ; comment line 1 + ; comment line 2 +foo = bar ;c1 + +[comment_test] +test1 = 1;2;3 ; only this will be a comment +test2 = 2;3;4;this won't be a comment, needs whitespace before ';' +test;3 = 345 ; key should be "test;3" +test4 = 4#5#6 ; '#' only starts a comment at start of line +#test5 = 567 ; entire line commented + # test6 = 678 ; entire line commented, except in MULTILINE mode +test7 = ; blank value, except if inline comments disabled +test8 =; not a comment, needs whitespace before ';' + +[colon_tests] +Content-Type: text/html +foo:bar +adams : 42 +funny1 : with = equals +funny2 = with : colons +funny3 = two = equals +funny4 : two : colons + + +)"; + +TEST(simple_dom, ini) { + auto doc = parse(example_ini, sizeof(example_ini)-1, DOC_INI); + EXPECT_TRUE(doc); + EXPECT_EQ(doc.num_children(), 6); + expect_eq_kvs(doc["protocol"], { + {"version", "6"}, + {"ver", "4"}, + }); + expect_eq_kvs(doc["user"], { + {"name", "Bob Smith"}, + {"email", "bob@smith.com"}, + {"active", "true"}, + {"pi", "3.14159"}, + {"trillion", "1000000000000"}, + }); + expect_eq_kvs(doc["section1"], { + {"single1", "abc"}, + {"single2", "xyz"}, + }); + expect_eq_kvs(doc["section3"], { + {"single", "ghi"}, + {"multi", "the quick"}, + {"name", "bob smith"}, + {"foo", "bar"}, + }); + expect_eq_kvs(doc["comment_test"], { + {"test1", "1;2;3"}, + {"test2", "2;3;4;this won't be a comment, needs whitespace before ';'"}, + {"test;3", "345"}, + {"test4", "4#5#6"}, + {"test7", ""}, + {"test8", "; not a comment, needs whitespace before ';'"}, + }); + expect_eq_kvs(doc["colon_tests"], { + {"Content-Type", "text/html"}, + {"foo", "bar"}, + {"adams", "42"}, + {"funny1", "with = equals"}, + {"funny2", "with : colons"}, + {"funny3", "two = equals"}, + {"funny4", "two : colons"}, + }); +} diff --git a/examples/perf/io-perf.cpp b/examples/perf/io-perf.cpp index 059823838..6779726ce 100644 --- a/examples/perf/io-perf.cpp +++ b/examples/perf/io-perf.cpp @@ -19,6 +19,8 @@ limitations under the License. #include #include +#include +#include #include #include @@ -26,30 +28,32 @@ limitations under the License. #include #include #include +#include const static size_t LAST_IO_BOUNDARY = 2 * 1024 * 1024; -static std::random_device rd; -static std::mt19937 gen(rd()); -static uint64_t qps = 0; +static std::atomic qps{0}; DEFINE_uint64(io_depth, 128, "io depth"); DEFINE_string(disk_path, "", "disk path. For example, /dev/nvme2n1"); DEFINE_uint64(disk_size, 0, "disk size. For example, 1000000000000. No need to align. Can be approximate number"); DEFINE_uint64(io_size, 4096, "io size"); DEFINE_bool(io_uring, false, "test io_uring or aio"); +DEFINE_bool(use_workpool, false, "dispatch read tasks to multi vCPU by using workpool"); +DEFINE_uint64(vcpu_num, 4, "vCPU num of the workpool"); #define ROUND_DOWN(N, S) ((N) & ~((S) - 1)) static uint64_t random(uint64_t N) { - std::uniform_int_distribution distrib(0, N); - return distrib(gen); + static std::random_device rd; + static std::mt19937_64 gen(rd()); + return gen() % N; } static void show_qps_loop() { while (true) { - photon::thread_sleep(1); - LOG_INFO("QPS: `, BW: ` MB/s", qps, qps * FLAGS_io_size / 1024 / 1024); - qps = 0; + std::this_thread::sleep_for(std::chrono::seconds(1)); + LOG_INFO("QPS: `, BW: ` MB/s", qps.load(), qps.load() * FLAGS_io_size / 1024 / 1024); + qps.store(0, std::memory_order_relaxed); } } @@ -63,7 +67,27 @@ static void infinite_read(const uint64_t max_offset, photon::fs::IFile* src_file LOG_ERROR("read fail, count `, offset `, ret `, errno `", count, offset, ret, ERRNO()); exit(1); } - qps++; + qps.fetch_add(1, std::memory_order_relaxed); + } +} + +static void infinite_read_by_work_pool(const uint64_t max_offset, photon::fs::IFile* src_file, + IOAlloc* alloc, photon::WorkPool* work_pool) { + size_t count = FLAGS_io_size; + void* buf = alloc->alloc(count); + while (true) { + photon::semaphore sem; + work_pool->async_call(new auto([&] { + uint64_t offset = ROUND_DOWN(random(max_offset), count); + int ret = src_file->pread(buf, FLAGS_io_size, offset); + if (ret != (int) count) { + LOG_ERROR("read fail, count `, offset `, ret `, errno `", count, offset, ret, ERRNO()); + exit(1); + } + qps.fetch_add(1, std::memory_order_relaxed); + sem.signal(1); + })); + sem.wait(1); } } @@ -82,12 +106,15 @@ int main(int argc, char** arg) { int io_engine = FLAGS_io_uring ? photon::INIT_IO_NONE : photon::INIT_IO_LIBAIO; int fs_io_engine = FLAGS_io_uring ? photon::fs::ioengine_iouring : photon::fs::ioengine_libaio; - int ret = photon::init(ev_engine, io_engine, photon::PhotonOptions{.libaio_queue_depth = 512}); + photon::PhotonOptions opt; + opt.use_pooled_stack_allocator = true; + opt.libaio_queue_depth = 512; + int ret = photon::init(ev_engine, io_engine, opt); if (ret != 0) { LOG_ERROR_RETURN(0, -1, "init failed"); } - photon::thread_create11(show_qps_loop); + new std::thread(show_qps_loop); // Read only open with direct-IO int flags = O_RDONLY | O_DIRECT; @@ -101,8 +128,17 @@ int main(int argc, char** arg) { AlignedAlloc io_alloc(4096); uint64_t max_offset = FLAGS_disk_size - LAST_IO_BOUNDARY; - for (uint64_t i = 0; i < FLAGS_io_depth; i++) { - photon::thread_create11(infinite_read, max_offset, file, &io_alloc); + photon::WorkPool* work_pool = nullptr; + if (FLAGS_use_workpool) { + work_pool = new photon::WorkPool(FLAGS_vcpu_num, ev_engine, io_engine, 0); + for (uint64_t i = 0; i < FLAGS_io_depth; i++) { + photon::thread_create11(infinite_read_by_work_pool, max_offset, file, &io_alloc, work_pool); + } + } else { + for (uint64_t i = 0; i < FLAGS_io_depth; i++) { + photon::thread_create11(infinite_read, max_offset, file, &io_alloc); + } } + photon::thread_sleep(-1); } \ No newline at end of file diff --git a/examples/perf/net-perf.cpp b/examples/perf/net-perf.cpp index 31b4a9a09..9bd1b98bb 100644 --- a/examples/perf/net-perf.cpp +++ b/examples/perf/net-perf.cpp @@ -72,7 +72,8 @@ static int ping_pong_client() { DEFER(delete cli); auto run_ping_pong_worker = [&]() -> int { - char buf[FLAGS_buf_size]; + auto buf = malloc(FLAGS_buf_size); + DEFER(free(buf)); auto conn = cli->connect(ep); if (conn == nullptr) { @@ -127,7 +128,8 @@ static int streaming_client() { DEFER(delete conn); auto send = [&]() -> int { - char buf[FLAGS_buf_size]; + auto buf = malloc(FLAGS_buf_size); + DEFER(free(buf)); while (!stop_test) { ssize_t ret = conn->write(buf, FLAGS_buf_size); if (ret != (ssize_t) FLAGS_buf_size) { @@ -137,7 +139,8 @@ static int streaming_client() { return 0; }; auto recv = [&]() -> int { - char buf[FLAGS_buf_size]; + auto buf = malloc(FLAGS_buf_size); + DEFER(free(buf)); while (!stop_test) { ssize_t ret = conn->read(buf, FLAGS_buf_size); if (ret != (ssize_t) FLAGS_buf_size) { @@ -191,7 +194,8 @@ static int echo_server() { if (FLAGS_vcpu_num > 1) { work_pool->thread_migrate(); } - char buf[FLAGS_buf_size]; + auto buf = malloc(FLAGS_buf_size); + DEFER(free(buf)); while (true) { ssize_t ret1, ret2; ret1 = sock->recv(buf, FLAGS_buf_size); diff --git a/examples/sync-primitive/sync-primitive.cpp b/examples/sync-primitive/sync-primitive.cpp index 131493b51..ae29b0833 100644 --- a/examples/sync-primitive/sync-primitive.cpp +++ b/examples/sync-primitive/sync-primitive.cpp @@ -77,8 +77,8 @@ int main() { message.sem.wait(1); auto end = std::chrono::steady_clock::now(); auto duration_us = std::chrono::duration_cast(end - message.start).count(); - latency.fetch_add(duration_us, std::memory_order::memory_order_relaxed); - qps.fetch_add(1, std::memory_order::memory_order_relaxed); + latency.fetch_add(duration_us, std::memory_order_relaxed); + qps.fetch_add(1, std::memory_order_relaxed); } })); } diff --git a/fs/async_filesystem.cpp b/fs/async_filesystem.cpp index 477ede11f..9f69b6da1 100644 --- a/fs/async_filesystem.cpp +++ b/fs/async_filesystem.cpp @@ -85,7 +85,11 @@ namespace fs { public: template::type > +#else + typename R = typename std::invoke_result::type> +#endif R perform(IF* _if, Func func, ARGS...args) { return th_performer().call(_if, func, args...); diff --git a/fs/exportfs.cpp b/fs/exportfs.cpp index d60b65464..10cbc160e 100644 --- a/fs/exportfs.cpp +++ b/fs/exportfs.cpp @@ -142,8 +142,13 @@ namespace fs __attribute__((visibility("hidden"))) Delegate ExportBase::op; __attribute__((visibility("hidden"))) ThreadPoolBase* ExportBase::pool = nullptr; +#if __cplusplus > 202000L +#define PERFORM(ID, expr) \ + perform(timeout, new auto([=, this]() { do_callback(ID, expr, done); })); +#else #define PERFORM(ID, expr) \ perform(timeout, new auto([=]() { do_callback(ID, expr, done); })); +#endif class ExportAsAsyncFile : public ExportBase, public IAsyncFile, public IAsyncFileXAttr { diff --git a/fs/extfs/extfs.cpp b/fs/extfs/extfs.cpp index e11068dab..0dc5fa31a 100644 --- a/fs/extfs/extfs.cpp +++ b/fs/extfs/extfs.cpp @@ -1298,6 +1298,9 @@ class ExtFileSystem : public photon::fs::IFileSystem, public photon::fs::IFileSy extfs_io_manager = new_io_manager(_image_file); } fs = do_ext2fs_open(extfs_io_manager); + if (fs == nullptr) { + return; + } memset(fs->reserved, 0, sizeof(fs->reserved)); auto reserved = reinterpret_cast(fs->reserved); reserved[0] = reinterpret_cast(this); @@ -1505,7 +1508,11 @@ int ExtFile::flush_buffer() { photon::fs::IFileSystem *new_extfs(photon::fs::IFile *file, bool buffer) { auto extfs = new ExtFileSystem(file, buffer); - return extfs->fs ? extfs : nullptr; + if (extfs->fs == nullptr) { + delete extfs; + return nullptr; + } + return extfs; } } diff --git a/fs/extfs/test/test.cpp b/fs/extfs/test/test.cpp index cec15f222..e9f3d6dfd 100644 --- a/fs/extfs/test/test.cpp +++ b/fs/extfs/test/test.cpp @@ -16,6 +16,7 @@ limitations under the License. #include "../extfs.h" #include #include +#include #include #include #include @@ -28,6 +29,7 @@ limitations under the License. #include #include #include "../../../test/gtest.h" +#include "photon/common/utility.h" #define FILE_SIZE (2 * 1024 * 1024) @@ -992,6 +994,19 @@ TEST_F(ExtfsTest, Xattr) { EXPECT_EQ(ENOENT, errno); } + +TEST_F(ExtfsTest, InvalidFs) { + std::string rootfs = "/tmp/invalid_fs.img"; + auto file = photon::fs::open_localfile_adaptor(rootfs.c_str(), O_CREAT|O_TRUNC|O_RDWR, 0666); + EXPECT_NE(nullptr, file); + DEFER({ + delete file; + ::unlink(rootfs.c_str()); + }); + auto err_fs = photon::fs::new_extfs(file, false); + EXPECT_EQ(err_fs, nullptr); +} + int main(int argc, char **argv) { photon::init(photon::INIT_EVENT_DEFAULT, photon::INIT_IO_DEFAULT); DEFER(photon::fini();); diff --git a/fs/localfs.cpp b/fs/localfs.cpp index e133bfadb..0ddaf280d 100644 --- a/fs/localfs.cpp +++ b/fs/localfs.cpp @@ -316,8 +316,9 @@ namespace fs virtual dirent* get() override { if (direntp) { - memcpy(&m_dirent, direntp, sizeof(m_dirent)); - return &m_dirent; + memcpy(&m_dirent, direntp, + std::min(sizeof(m_dirent), (size_t)direntp->d_reclen)); + return &m_dirent; } return direntp; } diff --git a/fs/path.cpp b/fs/path.cpp index 5d95b4db8..196df6c82 100644 --- a/fs/path.cpp +++ b/fs/path.cpp @@ -254,13 +254,14 @@ namespace fs } int ret = next(); if (ret < 0) - m_path = {0, 0}; + m_path_len = 0; } int Walker::enter_dir() { auto dir = m_filesystem->opendir(m_path_buffer); if (!dir) - LOG_ERRNO_RETURN(0, -1, "failed to opendir(`)", m_path); + LOG_ERRNO_RETURN(0, -1, "failed to opendir(`)", + std::string_view(m_path_buffer, m_path_len)); m_stack.emplace(dir); return 0; } @@ -273,36 +274,36 @@ namespace fs struct stat st; auto ret = m_filesystem->lstat(m_path_buffer, &st); if (ret < 0) - LOG_ERRNO_RETURN(0, -1, "failed to lstat '`'", m_path); + LOG_ERRNO_RETURN(0, -1, "failed to lstat '`'", + std::string_view(m_path_buffer, m_path_len)); return S_ISDIR(st.st_mode); } return 0; } - void Walker::path_push_back(string_view s) - { - auto len0 = m_path.length(); + void Walker::path_push_back(string_view s) { + auto len0 = m_path_len; auto len1 = s.length(); assert(len0 + len1 < sizeof(m_path_buffer) - 1); memcpy(m_path_buffer + len0, s.data(), len1 + 1); - m_path = string_view(m_path_buffer, len0 + len1); + m_path_len = len0 + len1; } - void Walker::path_pop_back(size_t len1) - { - auto len0 = m_path.length(); + void Walker::path_pop_back(size_t len1) { + auto len0 = m_path_len; assert(len0 > len1); len0 -= len1; m_path_buffer[len0] = '\0'; - m_path = string_view(m_path_buffer, len0); + m_path_len = len0; } int Walker::next() { again: - if (m_path.empty()) return -1; - if (m_path.back() != '/') + if (m_path_len == 0) return -1; + std::string_view path(m_path_buffer, m_path_len); + if (path.back() != '/') { - auto m = m_path.rfind('/'); - if (m != m_path.npos) { - auto len0 = m_path.length(); + auto m = path.rfind('/'); + if (m != path.npos) { + auto len0 = path.length(); path_pop_back(len0 - m - 1); m_stack.top()->next(); } @@ -321,10 +322,10 @@ namespace fs m_stack.pop(); if (m_stack.empty()) { - m_path.remove_prefix(m_path.length()); + m_path_len = 0; return -1; // finished walking } - assert(m_path.back() == '/'); + assert(m_path_buffer[m_path_len - 1] == '/'); path_pop_back(1); goto again; } diff --git a/fs/path.h b/fs/path.h index 98d52ada0..be01c4921 100644 --- a/fs/path.h +++ b/fs/path.h @@ -168,13 +168,13 @@ namespace fs { public: Walker(IFileSystem* fs, string_view path); - string_view path() { return m_path; } + string_view path() { return {m_path_buffer, m_path_len}; } string_view get() { return path(); } - bool valid() { return !m_path.empty(); } + bool valid() { return m_path_len; } int next(); protected: - string_view m_path; + size_t m_path_len = 0; IFileSystem* m_filesystem; std::stack> m_stack; char m_path_buffer[PATH_MAX]; diff --git a/fs/test/test.cpp b/fs/test/test.cpp index 02d73eebd..ccc1d51cd 100644 --- a/fs/test/test.cpp +++ b/fs/test/test.cpp @@ -571,12 +571,17 @@ TEST(AsyncFS, AsyncFS) tafs.do_test(f); } +#if __cplusplus < 202000 +#define CAPTURE = +#else +#define CAPTURE =,this +#endif class AFile : public ExampleAsyncFile { public: OVERRIDE_ASYNC(ssize_t, pread, void *buf, size_t count, off_t offset) { - std::thread t([=]() + std::thread t([CAPTURE]() { ::sleep(1); callback_umimplemented(done); @@ -588,7 +593,7 @@ class AFile : public ExampleAsyncFile { LOG_DEBUG("into afile pwrite `", timeout); - std::thread t([=]() + std::thread t([CAPTURE]() { ::usleep(timeout); //only return count/2 for timeout fired @@ -604,7 +609,7 @@ class ExampleAsyncDir: public AsyncDIR { OVERRIDE_ASYNC0(int, closedir) { } OVERRIDE_ASYNC0(dirent*, get) { - std::thread t([=]() + std::thread t([CAPTURE]() { ::usleep(timeout); AsyncResult r; @@ -631,7 +636,7 @@ class AFS : public ExampleAsyncFileSystem { public: OVERRIDE_ASYNC(IAsyncFile*, open, const char *pathname, int flags) { - std::thread t([=]() + std::thread t([CAPTURE]() { callback(done, UINT32_MAX, exampleAfile, 0); }); @@ -640,7 +645,7 @@ class AFS : public ExampleAsyncFileSystem { OVERRIDE_ASYNC(AsyncDIR*, opendir, const char *name) { - std::thread t([=]() + std::thread t([CAPTURE]() { ::usleep(timeout); if (name[0] == 's') { @@ -869,11 +874,8 @@ TEST(range_split_power2, basic) { } TEST(range_split_power2, random_test) { - uint64_t offset, length, interval; - offset = rand(); - length = rand(); - auto interval_shift = rand()%32 + 1; - interval = uint64_t(1) << interval_shift; + uint64_t offset = rand(), length = rand(); + uint64_t interval = 1UL << (rand()%32 + 1); fs::range_split_power2 split(offset, length, interval); EXPECT_EQ(offset, split.begin); EXPECT_EQ(offset + length, split.end); @@ -954,7 +956,8 @@ void random_content_rw_test(uint64_t test_block_size, uint64_t test_block_num, f } file->fsync(); file->lseek(0, SEEK_SET); - char buff[test_block_size]; + auto buff = malloc(test_block_size); + DEFER(free(buff)); for (const auto &data : rand_data) { file->read(buff, test_block_size); EXPECT_EQ(0, memcmp(data.get(), buff, test_block_size)); @@ -962,7 +965,8 @@ void random_content_rw_test(uint64_t test_block_size, uint64_t test_block_num, f } void sequence_content_rw_test (uint64_t test_block_size, uint64_t test_block_num, const char* test_seq, fs::IFile* file) { - char data[test_block_size]; + auto data = malloc(test_block_size); + DEFER(free(data)); file->lseek(0, SEEK_SET); for (auto i: xrange(test_block_num)) { memset(data, test_seq[i], test_block_size); @@ -970,7 +974,8 @@ void sequence_content_rw_test (uint64_t test_block_size, uint64_t test_block_num } file->fdatasync(); file->lseek(0, SEEK_SET); - char buff[test_block_size]; + auto buff = malloc(test_block_size); + DEFER(free(buff)); for (uint64_t i = 0; i< test_block_num; i++) { file->read(buff, test_block_size); memset(data, *(test_seq++), test_block_size); @@ -1017,7 +1022,8 @@ TEST(XFile, fixed_size_linear_file_basic) { sequence_content_rw_test(test_block_size, test_file_num, "abcdefghijklmn", xf.get()); random_content_rw_test(test_block_size, test_file_num, xf.get()); xf->lseek(test_block_size*test_file_num, SEEK_SET); - char buff[test_block_size]; + auto buff = malloc(test_block_size); + DEFER(free(buff)); log_output = log_output_null; DEFER({ log_output = log_output_stdout; @@ -1088,7 +1094,8 @@ TEST(XFile, stripe_file_basic) { xfile_not_impl_test(xf.get()); sequence_content_rw_test(test_file_size, test_file_num, "abcdefghijklmn", xf.get()); random_content_rw_test(test_file_size, test_file_num, xf.get()); - char buff[test_block_size]; + auto buff = malloc(test_block_size); + DEFER(free(buff)); log_output = log_output_null; DEFER({ log_output = log_output_stdout; @@ -1362,6 +1369,7 @@ TEST(Walker, basic) { std::system(std::string("touch " + root + file2).c_str()); int count = 0; for (auto file : enumerable(Walker(srcFs, "/"))) { + LOG_INFO(VALUE(file.data())); if (file.back() == '2') { EXPECT_EQ(0, strcmp(file.data(), file2.c_str())); } else { diff --git a/io/iouring-wrapper.cpp b/io/iouring-wrapper.cpp index 8efe34ae1..254a33039 100644 --- a/io/iouring-wrapper.cpp +++ b/io/iouring-wrapper.cpp @@ -188,7 +188,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub ioCtx timer_ctx(true, false); __kernel_timespec ts; auto usec = timeout.timeout_us(); - if (usec < std::numeric_limits::max()) { + if (usec < (uint64_t)std::numeric_limits::max()) { sqe->flags |= IOSQE_IO_LINK; ts = usec_to_timespec(usec); sqe = _get_sqe(); @@ -334,7 +334,7 @@ class iouringEngine : public MasterEventEngine, public CascadingEventEngine, pub ssize_t wait_and_fire_events(uint64_t timeout) override { // Prepare own timeout - if (timeout > std::numeric_limits::max()) { + if (timeout > (uint64_t)std::numeric_limits::max()) { timeout = std::numeric_limits::max(); } diff --git a/net/curl.h b/net/curl.h index edbcaf199..d9d308afe 100644 --- a/net/curl.h +++ b/net/curl.h @@ -242,6 +242,7 @@ class cURL { setopt(CURLOPT_ERRORBUFFER, m_errmsg); setopt(CURLOPT_NOSIGNAL, 1L); setopt(CURLOPT_TCP_NODELAY, 1L); + setopt(CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); m_errmsg[0] = '\0'; } ~cURL() { curl_easy_cleanup(m_curl); } diff --git a/net/http/client.cpp b/net/http/client.cpp index 5847175ea..4409463d0 100644 --- a/net/http/client.cpp +++ b/net/http/client.cpp @@ -43,22 +43,18 @@ class PooledDialer { photon::mutex init_mtx; bool initialized = false; bool tls_ctx_ownership = false; - std::vector src_ips; // If there is a photon thread switch during construction, the constructor might be called // multiple times, even for a thread_local instance. Therefore, ensure that there is no photon // thread switch inside the constructor. Place the initialization work in init() and ensure it // is initialized only once. - PooledDialer() { - photon::fini_hook({this, &PooledDialer::at_photon_fini}); - } - int init(TLSContext *_tls_ctx, std::vector &src_ips) { if (initialized) return 0; SCOPED_LOCK(init_mtx); if (initialized) return 0; + photon::fini_hook({this, &PooledDialer::at_photon_fini}); tls_ctx = _tls_ctx; if (!tls_ctx) { tls_ctx_ownership = true; @@ -81,6 +77,8 @@ class PooledDialer { tcpsock.reset(); if (tls_ctx_ownership) delete tls_ctx; + initialized = false; + tls_ctx_ownership = false; } ISocketStream* dial(std::string_view host, uint16_t port, bool secure, @@ -107,7 +105,7 @@ ISocketStream* PooledDialer::dial(std::string_view host, uint16_t port, bool sec if (secure) { tlssock->timeout(timeout); sock = tlssock->connect(ep); - tls_stream_set_hostname(sock, host.data()); + tls_stream_set_hostname(sock, estring_view(host).extract_c_str()); } else { tcpsock->timeout(timeout); sock = tcpsock->connect(ep); @@ -222,7 +220,14 @@ class ClientImpl : public Client { LOG_ERROR_RETURN(0, ROUNDTRIP_NEED_RETRY, "send header failed, retry"); } sock->timeout(tmo.timeout()); - if (op->body_stream) { + if (op->body_buffer_size > 0) { + // send body_buffer + if (req.write(op->body_buffer, op->body_buffer_size) < 0) { + sock->close(); + req.reset_status(); + LOG_ERROR_RETURN(0, ROUNDTRIP_NEED_RETRY, "send body buffer failed, retry"); + } + } else if (op->body_stream) { // send body_stream if (req.write_stream(op->body_stream) < 0) { sock->close(); @@ -254,8 +259,10 @@ class ClientImpl : public Client { auto buf = malloc(kMinimalHeadersSize); resp.reset((char *)buf, kMinimalHeadersSize, true, sock.release(), true, req.verb()); } - if (op->resp.receive_header(tmo.timeout()) != 0) { + resp.reset_status(HEADER_SENT); + if (resp.receive_header(tmo.timeout()) != 0) { req.reset_status(); + resp.reset(nullptr, false); LOG_ERROR_RETURN(0, ROUNDTRIP_NEED_RETRY, "read response header failed"); } diff --git a/net/http/client.h b/net/http/client.h index 34d5f0ad9..7288eb85c 100644 --- a/net/http/client.h +++ b/net/http/client.h @@ -62,9 +62,10 @@ class Client : public Object { bool enable_proxy = false; std::string_view uds_path; // If set, Unix Domain Socket will be used instead of TCP. // URL should still be the format of http://localhost/xxx - IStream* body_stream = nullptr; // use body_stream as body - using BodyWriter = Delegate; // or call body_writer if body_stream - BodyWriter body_writer = {}; // is not set + + IStream* body_stream = nullptr; // priority: set_body > body_stream > body_writer + using BodyWriter = Delegate; + BodyWriter body_writer = {}; static Operation* create(Client* c, Verb v, std::string_view url, uint16_t buf_size = 64 * 1024 - 1) { @@ -91,9 +92,22 @@ class Client : public Object { uds_path = unix_socket_path; return _client->call(this); } + // set body buffer and set content length automatically + void set_body(const void *buf, size_t size) { + body_buffer = buf; + body_buffer_size = size; + req.headers.content_length(size); + } + void set_body(std::string_view buf) { + set_body(buf.data(), buf.length()); + } + protected: Client* _client; + const void *body_buffer = nullptr; + size_t body_buffer_size = 0; + char _buf[0]; Operation(Client* c, Verb v, std::string_view url, uint16_t buf_size) : req(_buf, buf_size, v, url, c->has_proxy()), @@ -106,6 +120,8 @@ class Client : public Object { explicit Operation(uint16_t buf_size) : req(_buf, buf_size), _client(nullptr) {} Operation() = delete; ~Operation() = default; + + friend class ClientImpl; }; template @@ -158,7 +174,7 @@ class Client : public Object { std::vector m_bind_ips; }; -//A Client without cookie_jar would ignore all response-header "Set-Cookies" +//A Client without cookie_jar would ignore all response-header "Set-Cookie" Client* new_http_client(ICookieJar *cookie_jar = nullptr, TLSContext *tls_ctx = nullptr); ICookieJar* new_simple_cookie_jar(); diff --git a/net/http/cookie_jar.cpp b/net/http/cookie_jar.cpp index 11e835670..5b3f1fba1 100644 --- a/net/http/cookie_jar.cpp +++ b/net/http/cookie_jar.cpp @@ -16,7 +16,6 @@ limitations under the License. #include #include - #include "client.h" #include "parser.h" #include @@ -30,12 +29,12 @@ namespace http { using namespace std; static uint64_t local_gmt_gap_us = 0; -uint64_t time_gmt_to_local(uint64_t local_now) { +static uint64_t time_gmt_to_local(uint64_t local_now) { if (local_gmt_gap_us == 0) { time_t now = time(nullptr); tm* gmt = gmtime(&now); auto now_s = mktime(gmt); - local_gmt_gap_us = (now - now_s) * 1000 * 1000; + local_gmt_gap_us = (now - now_s) * 1000ULL * 1000ULL; } return local_now + local_gmt_gap_us; } @@ -56,8 +55,8 @@ class SimpleCookie { public: unordered_map_string_key m_kv; int get_cookies_from_headers(Message* message) { - auto it = message->headers.find("Set-Cookies"); - while (it != message->headers.end() && it.first() == "Set-Cookies") { + auto it = message->headers.find("Set-Cookie"); + while (it != message->headers.end() && it.first() == "Set-Cookie") { LOG_INFO("get cookie"); auto Cookies = it.second(); Parser p(Cookies); @@ -84,8 +83,9 @@ class SimpleCookie { bool first_kv = true; vector eliminate; if (request->headers.insert("Cookie", "") != 0) return -1; + uint64_t now = time(0) * 1000ULL * 1000ULL; for (auto& it : m_kv) { - if (it.second.m_expire <= photon::now) { + if (it.second.m_expire <= now) { eliminate.emplace_back(it.first); continue; } diff --git a/net/http/headers.cpp b/net/http/headers.cpp index ef997f5cd..39ff1fed1 100644 --- a/net/http/headers.cpp +++ b/net/http/headers.cpp @@ -33,22 +33,17 @@ class HeaderAssistant { const HeadersBase* _h; HeaderAssistant(const HeadersBase* h) : _h(h) {} - int icmp(estring_view k1, estring_view k2) const { - int r = (int)(k1.size() - k2.size()); - return r ? r : strncasecmp(k1.data(), k2.data(), k1.size()); - } - bool equal_to(rstring_view16 _k, std::string_view key) const { - return icmp(_k | _h->m_buf, key) == 0; + return (_k | _h->m_buf).icmp(key) == 0; } bool less(rstring_view16 _k1, rstring_view16 _k2) const { - return icmp(_k1 | _h->m_buf, _k2 | _h->m_buf) < 0; + return (_k1 | _h->m_buf).icmp(_k2 | _h->m_buf) < 0; } bool less(rstring_view16 _k, std::string_view key) const { - return icmp(_k | _h->m_buf, key) < 0; + return (_k | _h->m_buf).icmp(key) < 0; } bool less(std::string_view key, rstring_view16 _k) const { - return icmp(key, _k | _h->m_buf) < 0; + return estring_view(key).icmp(_k | _h->m_buf) < 0; } bool operator()(HeadersBase::KV a, std::string_view b) const { return less(a.first, b); } diff --git a/net/http/message.cpp b/net/http/message.cpp index 39f60c8bd..1627b0b8d 100644 --- a/net/http/message.cpp +++ b/net/http/message.cpp @@ -45,6 +45,18 @@ Message::~Message() { free(m_buf); } +void Message::reset() { + if (m_stream && m_stream_ownership) { + delete m_stream; + } + headers.reset(); + m_buf_size = 0; + m_body_stream.reset(); + m_stream = nullptr; + m_stream_ownership = false; + reset_status(); +} + int Message::receive_header(uint64_t timeout) { auto tmo = Timeout(timeout); int ret = 0; @@ -174,7 +186,7 @@ ssize_t Message::write_stream(IStream *input, size_t size_limit) { if (input == nullptr) return 0; - size_t buf_size = 65536; + constexpr size_t buf_size = 65536; char seg_buf[buf_size + 4096]; char *aligned_buf = (char*) (((uint64_t)(&seg_buf[0]) + 4095) / 4096 * 4096); size_t ret = 0; diff --git a/net/http/message.h b/net/http/message.h index 9b85b9b3b..e786d0f35 100644 --- a/net/http/message.h +++ b/net/http/message.h @@ -69,15 +69,9 @@ class Message : public IStream { m_stream = s; m_stream_ownership = stream_ownership; } - void reset() { - headers.reset(); - m_buf_size = 0; - m_body_stream.reset(); - m_stream_ownership = false; - reset_status(); - } - void reset_status() { - message_status = INIT; + void reset(); + void reset_status(int status = INIT) { + message_status = status; } bool keep_alive() const { return m_keep_alive; @@ -98,9 +92,9 @@ class Message : public IStream { ssize_t write_stream(IStream *stream, size_t size_limit = -1); int close() override { return 0; } - // size of body + // size of body: infer from Content-Range/Content-Length in response header size_t body_size() const; - // size of origin resource + // size of origin resource: infer from Content-Range/Content-Length in response header ssize_t resource_size() const; // in general, it is called automatically diff --git a/net/http/server.cpp b/net/http/server.cpp index a2818f0b3..8bc88afbe 100644 --- a/net/http/server.cpp +++ b/net/http/server.cpp @@ -196,24 +196,25 @@ class FsHandler : public HTTPHandler { DEFER(LOG_DEBUG("leave fs handler")); auto target = req.target(); auto pos = target.find("?"); - std::string query; + estring_view query; if (pos != std::string_view::npos) { - query = std::string(target.substr(pos + 1)); + query = target.substr(pos + 1); target = target.substr(0, pos); } - estring filename(target); + estring_view filename(target); if (!prefix.empty()) filename = filename.substr(prefix.size() - 1); LOG_DEBUG(VALUE(filename)); - auto file = m_fs->open(filename.c_str(), O_RDONLY); + auto file = m_fs->open(filename.extract_c_str(), O_RDONLY); if (!file) { failed_resp(resp); LOG_ERROR_RETURN(0, 0, "open file ` failed", target); } DEFER(delete file); - if (!query.empty()) file->ioctl(fs::HTTP_URL_PARAM, query.c_str()); + if (!query.empty()) + file->ioctl(fs::HTTP_URL_PARAM, (const char*) query.extract_c_str()); struct stat buf; if (file->fstat(&buf) < 0) { @@ -300,7 +301,7 @@ class ForwardProxyHandler: public ProxyHandler { client, client_ownership) {} int tunnel_copy(ISocketStream *src, ISocketStream *dst) { - size_t buf_size = 65536; + constexpr size_t buf_size = 65536; char seg_buf[buf_size + 4096]; char *aligned_buf = (char*) (((uint64_t)(&seg_buf[0]) + 4095) / 4096 * 4096); diff --git a/net/http/test/client_function_test.cpp b/net/http/test/client_function_test.cpp index 5fa4f2b87..dbb6a3d85 100644 --- a/net/http/test/client_function_test.cpp +++ b/net/http/test/client_function_test.cpp @@ -213,6 +213,18 @@ TEST(http_client, post) { ret = op2->resp.read(buf, 4096); EXPECT_EQ(ret, 7); EXPECT_EQ(0, strncmp(buf, "success", ret)); + + // body buffer test + auto op3 = client->new_operation(Verb::POST, target); + DEFER(client->destroy_operation(op3)); + void *body_buf = malloc(st.st_size); + EXPECT_EQ(st.st_size, file->pread(body_buf, st.st_size, 0)); + op3->set_body(body_buf, st.st_size); + client->call(op3); + EXPECT_EQ(200, op3->resp.status_code()); + ret = op3->resp.read(buf, 4096); + EXPECT_EQ(ret, 7); + EXPECT_EQ(0, strncmp(buf, "success", ret)); } @@ -551,8 +563,8 @@ TEST(http_client, vcpu) { auto client = new_http_client(); DEFER(delete client); - int vcpu_num = 16; photon::semaphore sem(0); + constexpr int vcpu_num = 16; std::thread th[vcpu_num]; for (int i = 0; i < vcpu_num; i++) { th[i] = std::thread([&] { diff --git a/net/http/test/cookie_jar_test.cpp b/net/http/test/cookie_jar_test.cpp index 5bbe33803..55b6ea8fa 100644 --- a/net/http/test/cookie_jar_test.cpp +++ b/net/http/test/cookie_jar_test.cpp @@ -49,7 +49,7 @@ class ResponseHeaderAdaptor : public Response { }; ResponseHeaderAdaptor* new_resp(std::string text) { - text = "HTTP/1.1 200 ok\r\nSet-Cookies: " + text + "\r\n\r\n"; + text = "HTTP/1.1 200 ok\r\nSet-Cookie: " + text + "\r\n\r\n"; ResponseHeaderAdaptor *ret = new ResponseHeaderAdaptor(); memcpy(ret->m_buf, text.data(), text.size()); ret->append_bytes(text.size()); diff --git a/net/http/url.cpp b/net/http/url.cpp index e5a534a68..9f282f666 100644 --- a/net/http/url.cpp +++ b/net/http/url.cpp @@ -103,14 +103,16 @@ static bool isunreserved(char c) { return false; } -std::string url_escape(std::string_view url) { +std::string url_escape(std::string_view url, bool escape_slash) { static const char hex[] = "0123456789ABCDEF"; std::string ret; ret.reserve(url.size() * 2); - for (auto c : url) { + for (unsigned char c : url) { if (isunreserved(c)) { ret.push_back(c); + } else if (!escape_slash && c == '/') { + ret.push_back(c); } else { ret += '%'; ret += hex[c >> 4]; @@ -124,7 +126,7 @@ std::string url_unescape(std::string_view url) { std::string ret; ret.reserve(url.size()); for (unsigned int i = 0; i < url.size(); i++) { - if (url[i] == '%') { + if (url[i] == '%' && i + 2 < url.size()) { auto c = estring_view(url.substr(i + 1, 2)).hex_to_uint64(); ret += static_cast(c); i += 2; diff --git a/net/http/url.h b/net/http/url.h index 453427123..a0dbe8d6b 100644 --- a/net/http/url.h +++ b/net/http/url.h @@ -109,7 +109,7 @@ inline bool need_optional_port(const URL& u) { return false; } -std::string url_escape(std::string_view url); +std::string url_escape(std::string_view url, bool escape_slash = true); std::string url_unescape(std::string_view url); } // namespace http diff --git a/net/kernel_socket.cpp b/net/kernel_socket.cpp index ef58d0487..a4e04f9cd 100644 --- a/net/kernel_socket.cpp +++ b/net/kernel_socket.cpp @@ -315,7 +315,7 @@ class KernelSocketServer : public SocketServerBase { int bind(const EndPoint& ep) override { auto s = sockaddr_storage(ep); if (m_listen_fd < 0) { - m_listen_fd = socket(s.get_sockaddr()->sa_family, 0, m_nonblocking, false); + m_listen_fd = socket(s.get_sockaddr()->sa_family, 0, m_nonblocking, true); if (m_listen_fd < 0) return -1; } if (m_opts.setsockopt(m_listen_fd) != 0) { @@ -333,7 +333,7 @@ class KernelSocketServer : public SocketServerBase { LOG_ERRNO_RETURN(0, -1, VALUE(path)); } if (m_listen_fd < 0) { - m_listen_fd = socket(AF_UNIX, 0, true, false); + m_listen_fd = socket(AF_UNIX, 0, true, true); if (m_listen_fd < 0) LOG_ERRNO_RETURN(0, m_listen_fd, "failed to create UNIX domain socket at ", ALogString(path, count)); } diff --git a/net/pooled_socket.cpp b/net/pooled_socket.cpp index 668798b76..07993bea0 100644 --- a/net/pooled_socket.cpp +++ b/net/pooled_socket.cpp @@ -159,7 +159,6 @@ class TCPSocketPool : public ForwardSocketClient { list.erase(node); if (list.empty()) fdmap.erase(it); rm_watch(node); - delete node; } public: @@ -250,6 +249,7 @@ class TCPSocketPool : public ForwardSocketClient { // socket shutdown drop_from_pool(nodes[i]); } + for (int i = 0; i < ret; i++) delete nodes[i]; } } }; diff --git a/net/socket.h b/net/socket.h index 415f4d79d..af68f436a 100644 --- a/net/socket.h +++ b/net/socket.h @@ -45,6 +45,7 @@ namespace net { public: union { in6_addr addr = {}; + // all data is in network byte order struct { uint16_t _1, _2, _3, _4, _5, _6; uint8_t a, b, c, d; }; } __attribute__((packed)); // For compatibility, the default constructor is still 0.0.0.0 (IPv4) @@ -96,6 +97,8 @@ namespace net { bool undefined() const { return mem_equal(V4Any()); } + void reset() { *this = IPAddr(); } + void clear() { reset(); } // Should ONLY be used for IPv4 address uint32_t to_nl() const { assert(is_ipv4()); @@ -167,7 +170,14 @@ namespace net { return !operator==(rhs); } bool undefined() const { - return addr.undefined(); + return addr.undefined() && port == 0; + } + void reset() { + addr.reset(); + port = 0; + } + void clear() { + reset(); } }; diff --git a/net/test/test-ipv6.cpp b/net/test/test-ipv6.cpp index b05d2f47c..f41836eda 100644 --- a/net/test/test-ipv6.cpp +++ b/net/test/test-ipv6.cpp @@ -14,7 +14,7 @@ TEST(ipv6, endpoint) { c = photon::net::EndPoint("[::1]:8888"); EXPECT_FALSE(c.undefined()); c = photon::net::EndPoint("0.0.0.0:8888"); - EXPECT_TRUE(c.undefined()); + EXPECT_FALSE(c.undefined()); EXPECT_EQ(8888, c.port); c = photon::net::EndPoint("::", 8888); EXPECT_TRUE(!c.undefined()); diff --git a/net/utils-stdstring.h b/net/utils-stdstring.h index 849b0440b..54d515853 100644 --- a/net/utils-stdstring.h +++ b/net/utils-stdstring.h @@ -42,8 +42,8 @@ inline std::string to_string(const IPAddr& addr) { inline estring to_string(const photon::net::EndPoint& ep) { char ip4or6[INET6_ADDRSTRLEN]; __to_string(ep.addr, ip4or6); - return ep.is_ipv4() ? estring().appends(ip4or6, ':', ep.port): - estring().appends('[', ip4or6, "]:", ep.port); + return ep.is_ipv4() ? estring().appends(ip4or6, ":", ep.port): + estring().appends("[", ip4or6, "]:", ep.port); } } diff --git a/net/utils.cpp b/net/utils.cpp index 802aad475..3419d2e8c 100644 --- a/net/utils.cpp +++ b/net/utils.cpp @@ -263,38 +263,50 @@ class DefaultResolver : public Resolver { IPAddr addr; IPAddrNode(IPAddr addr) : addr(addr) {} }; - using IPAddrList = intrusive_list; - + struct IPAddrList : public intrusive_list, spinlock { + ~IPAddrList() { delete_all(); } + }; + struct ResolveCtx { + std::string host; + Delegate filter; + spinlock lock; + IPAddrList *addrs; + photon::semaphore sem; + }; IPAddr do_resolve(std::string_view host, Delegate filter) { auto ctr = [&]() -> IPAddrList* { - auto addrs = new IPAddrList(); - photon::semaphore sem; - std::thread([&]() { - auto now = std::chrono::steady_clock::now(); - IPAddrList ret; + std::unique_ptr addrs(new IPAddrList()); + auto ctx = std::make_shared(); + ctx->addrs = addrs.get(); + ctx->host = std::string(host); + ctx->filter = filter; + std::thread([ctx]() { auto cb = [&](IPAddr addr) -> int { - if (filter && !filter.fire(addr)) - return 0; - ret.push_back(new IPAddrNode(addr)); + SCOPED_LOCK(ctx->lock); + if (ctx->filter && !ctx->filter.fire(addr)) return 0; + if (ctx->addrs) { + ctx->addrs->push_back(new IPAddrNode(addr)); + } return 0; }; - _gethostbyname(host, cb); - auto time_elapsed = std::chrono::duration_cast( - std::chrono::steady_clock::now() - now).count(); - if ((uint64_t)time_elapsed <= resolve_timeout_) { - addrs->push_back(std::move(ret)); - sem.signal(1); - } else { - LOG_ERROR("resolve timeout"); - while(!ret.empty()) - delete ret.pop_front(); - } + _gethostbyname(ctx->host, cb); + ctx->sem.signal(1); }).detach(); - sem.wait(1, resolve_timeout_); - return addrs; + ctx->sem.wait(1, resolve_timeout_); + SCOPED_LOCK(ctx->lock); + ctx->addrs = nullptr; + ctx->filter = {}; + if (addrs->empty()) { + return nullptr; + } + return addrs.release(); }; - auto ips = dnscache_.borrow(host, ctr); - if (ips->empty()) LOG_ERRNO_RETURN(0, IPAddr(), "Domain resolution for '`' failed", host); + auto ips = dnscache_.borrow(host, ctr, 1UL * 1000); + if (!ips || ips->empty()) { + ips.recycle(true); + LOG_ERRNO_RETURN(0, IPAddr(), "Domain resolution for '`' failed", host); + } + SCOPED_LOCK(*ips); auto ret = ips->front(); ips->node = ret->next(); // access in round robin order return ret->addr; @@ -303,13 +315,6 @@ class DefaultResolver : public Resolver { public: DefaultResolver(uint64_t cache_ttl, uint64_t resolve_timeout) : dnscache_(cache_ttl), resolve_timeout_(resolve_timeout) {} - ~DefaultResolver() { - for (auto it : dnscache_) { - ((IPAddrList*)it->_obj)->delete_all(); - } - dnscache_.clear(); - } - IPAddr resolve(std::string_view host) override { return do_resolve(host, nullptr); } @@ -321,12 +326,14 @@ class DefaultResolver : public Resolver { void discard_cache(std::string_view host, IPAddr ip) override { auto ipaddr = dnscache_.borrow(host); if (ip.undefined() || ipaddr->empty()) { - ipaddr->delete_all(); ipaddr.recycle(true); } else { + IPAddrNode* node = nullptr; + DEFER(delete node); + SCOPED_LOCK(*ipaddr); for (auto itr = ipaddr->rbegin(); itr != ipaddr->rend(); itr++) { if ((*itr)->addr == ip) { - ipaddr->erase(*itr); + ipaddr->erase(node = *itr); break; } } diff --git a/photon.cpp b/photon.cpp index ab937d3a2..7f28b7b57 100644 --- a/photon.cpp +++ b/photon.cpp @@ -131,6 +131,7 @@ int fini() { for (auto h : get_hook_vector()) { h.fire(); } + get_hook_vector().clear(); #ifdef __linux__ FINI_IO(LIBAIO, libaio_wrapper) FINI_IO(SOCKET_EDGE_TRIGGER, et_poller) diff --git a/rpc/rpc.cpp b/rpc/rpc.cpp index dbfecb101..0d9e06181 100644 --- a/rpc/rpc.cpp +++ b/rpc/rpc.cpp @@ -236,6 +236,8 @@ namespace rpc { COPY(func); COPY(stream); COPY(sk); + COPY(stream_serv_count); + COPY(stream_cv); COPY(w_lock); #undef COPY } @@ -375,14 +377,14 @@ namespace rpc { } static void* async_serve(void* args_) { - auto ctx = (Context*)args_; - Context context(std::move(*ctx)); - ctx->got_it = true; + bool &got_it = ((Context*)args_)->got_it; + Context context(std::move(*(Context*)args_)); + got_it = true; thread_yield(); context.serve_request(); // serve done, here reduce refcount - (*ctx->stream_serv_count) --; - ctx->stream_cv->notify_all(); + (*context.stream_serv_count) --; + context.stream_cv->notify_all(); return nullptr; } virtual int shutdown(bool no_more_requests) override { diff --git a/rpc/rpc.h b/rpc/rpc.h index eb195342a..6446db3d6 100644 --- a/rpc/rpc.h +++ b/rpc/rpc.h @@ -250,7 +250,7 @@ namespace rpc extern "C" Skeleton* new_skeleton(uint32_t pool_size = 128); __attribute__((deprecated)) - inline Skeleton* new_skeleton(bool /*concurrent*/, uint32_t pool_size = 128) { + inline Skeleton* new_skeleton(bool /*concurrent*/, uint32_t pool_size) { return new_skeleton(pool_size); } diff --git a/rpc/serialize.h b/rpc/serialize.h index df5763db3..a118a12a6 100644 --- a/rpc/serialize.h +++ b/rpc/serialize.h @@ -436,7 +436,7 @@ namespace rpc void process_field(buffer& x) { - if (x.size()==0) + if (x.size() == 0) return; x._ptr = _iov->extract_front_continuous(x.size()); if (!x._ptr) @@ -462,13 +462,9 @@ namespace rpc "only Messages are permitted"); // deserialize the main body from back - _iov=iov; - auto t = iov -> extract_back(); - if (t) { - if (!t->validate_checksum(iov, t, sizeof(*t))) { - failed = true; - return nullptr; - } + _iov = iov; + auto t = iov->extract_back(); + if (t && t->validate_checksum(iov, t, sizeof(*t))) { // deserialize aligned fields, and non-aligned fields, from front auto aligned = FilterAlignedFields(this, true); t->process_fields(aligned); @@ -477,7 +473,7 @@ namespace rpc } else { failed = true; } - return t; + return failed ? nullptr : t; } }; diff --git a/rpc/test/test-rpc-message.cpp b/rpc/test/test-rpc-message.cpp index 1a632d04c..f8ff12e5a 100644 --- a/rpc/test/test-rpc-message.cpp +++ b/rpc/test/test-rpc-message.cpp @@ -228,7 +228,7 @@ TEST(rpc, variable_length_serialization) { char channel[4096] = {}; // Send. - const char* send_string = "1"; + const char send_string[] = "1"; VariableLengthMessage m_send; m_send.a = 1; m_send.b.assign(send_string); @@ -243,7 +243,7 @@ TEST(rpc, variable_length_serialization) { // Receive. rpc::string has been assigned to a large buffer // Serialize first VariableLengthMessage m_recv; - const size_t buf_size = strlen(send_string) + 4096; + constexpr size_t buf_size = sizeof(send_string) + 4096; char buf[buf_size]; m_recv.b = photon::rpc::string(buf, buf_size); photon::rpc::SerializerIOV s_recv; diff --git a/thread/std-compat.h b/thread/std-compat.h index eb52de406..e78b69ce0 100644 --- a/thread/std-compat.h +++ b/thread/std-compat.h @@ -295,7 +295,7 @@ class condition_variable : public photon::condition_variable { template cv_status wait_until(unique_lock& lock, const ::std::chrono::time_point& t) { - auto d = t - ::std::chrono::steady_clock::now(); + auto d = t - Clock::now(); uint64_t timeout = __duration_to_microseconds(d); int ret = photon::condition_variable::wait(lock.mutex(), timeout); if (ret == 0) diff --git a/thread/test/test.cpp b/thread/test/test.cpp index b1edc3e6f..ade524580 100644 --- a/thread/test/test.cpp +++ b/thread/test/test.cpp @@ -490,6 +490,31 @@ TEST(Timer, Reapting) // timer_destroy(timer_arg.ptimer); } +TEST(Timer, Reapting2) +{ + timer_count = 5; + t0 = now_time(); + auto cb = [](void* arg) -> uint64_t { + LOG_INFO("timer callback"); + auto t1 = now_time(); + auto delta_t = t1 - t0; + EXPECT_GT(delta_t, 1 * 1000 * 1000); + EXPECT_LT(delta_t, 2 * 1000 * 1000); + t0 = t1; + LOG_INFO(VALUE(delta_t)); + LOG_INFO(VALUE(timer_count)); + auto& timer_count = *(int*)arg; + timer_count--; + return 0; + }; + Timer timer(1000*1000, {cb, &timer_count}); + while(timer_count >= 0) { + usleep(200000); // 200ms. Simulate the situation of cpu usage + std::cout << "ready to yield" << std::endl; + photon::thread_yield(); + } +} + TEST(Thread, function) { photon::join_handle* th1 = photon::thread_enable_join(photon::thread_create(&asdf, (void*)0)); @@ -621,6 +646,25 @@ TEST(thread11, test) } } +TEST(thread11, dtor) +{ + struct Foo { + Foo() { LOG_DEBUG("ctor ", this); } + Foo(const Foo&) { LOG_DEBUG("copy ctor ", this); } + Foo(Foo&&) = delete; + ~Foo() { LOG_DEBUG("dtor ", this); } + int x = 100; + }; + Foo foo; + LOG_DEBUG("thread_create11"); + auto th = thread_create11([foo]{ + LOG_DEBUG(&foo, ":thread"); + LOG_DEBUG(foo.x); + }); + auto jh = thread_enable_join(th); + thread_join(jh); + LOG_DEBUG("end"); +} void semaphore_test_hold(semaphore* sem, int &step) { int ret = 0; diff --git a/thread/thread.cpp b/thread/thread.cpp index 5ebbbf314..a3254d6e0 100644 --- a/thread/thread.cpp +++ b/thread/thread.cpp @@ -192,7 +192,7 @@ namespace photon uint64_t rwlock_mark; void* retval; }; - char* buf; + char* buf = nullptr; char* stackful_alloc_top; size_t stack_size; // offset 96B @@ -290,6 +290,53 @@ namespace photon } }; +#if defined(__has_feature) +# if __has_feature(address_sanitizer) // for clang +# ifndef __SANITIZE_ADDRESS__ +# define __SANITIZE_ADDRESS__ // GCC already sets this +# endif +# endif +#endif + +#ifdef __SANITIZE_ADDRESS__ + extern "C" { + // Check out sanitizer/asan-interface.h in compiler-rt for documentation. + void __sanitizer_start_switch_fiber(void** fake_stack_save, const void* bottom, + size_t size); + void __sanitizer_finish_switch_fiber(void* fake_stack_save, + const void** bottom_old, size_t* size_old); + } + + static void asan_start(void** save, thread* to) { + void* bottom = to->buf ? to->buf : to->stackful_alloc_top; + __sanitizer_start_switch_fiber(save, bottom, + to->stack_size); + } + + static void asan_finish(void* save) { + __sanitizer_finish_switch_fiber(save, nullptr, nullptr); + } + +#define ASAN_START() asan_finish((void*)nullptr); + +#define ASAN_SWITCH(to) \ + void* __save; \ + asan_start(&__save, to); \ + DEFER({ asan_finish(__save); }); + +#define ASAN_DIE_SWITCH(to) \ + asan_start(nullptr, to); + +#else +#define ASAN_START(ptr) +#define ASAN_SWITCH(to) +#define ASAN_DIE_SWITCH(to) +#endif + + static void _asan_start() asm("_asan_start"); + + __attribute__((used)) static void _asan_start() { ASAN_START(); } + #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Winvalid-offsetof" static_assert(offsetof(thread, vcpu) == offsetof(partial_thread, vcpu), "..."); @@ -588,6 +635,8 @@ namespace photon } Switch try_goto(thread* th) const { assert(th->vcpu == vcpu); + th->remove_from_list(); + current->insert_after(th); return _do_goto(th); } bool single() const { @@ -653,7 +702,8 @@ namespace photon inline void prepare_switch(thread* from, thread* to) { assert(from->vcpu == to->vcpu); assert(to->state == states::RUNNING); - to->get_vcpu()->switch_count++; + auto& cnt = to->get_vcpu()->switch_count; + (*(uint64_t*)&cnt)++; // increment of volatile variable is deprecated } static void _photon_thread_die(thread* th) asm("_photon_thread_die"); @@ -685,6 +735,7 @@ R"( DEF_ASM_FUNC(_photon_thread_stub) R"( + call _asan_start mov 0x40(%rbp), %rdi movq $0, 0x40(%rbp) call *0x48(%rbp) @@ -695,6 +746,7 @@ R"( ); inline void switch_context(thread* from, thread* to) { + ASAN_SWITCH(to); prepare_switch(from, to); auto _t_ = to->stack.pointer_ref(); register auto f asm("rsi") = from->stack.pointer_ref(); @@ -708,6 +760,7 @@ R"( inline void switch_context_defer(thread* from, thread* to, void (*defer)(void*), void* arg) { + ASAN_SWITCH(to); prepare_switch(from, to); auto _t_ = to->stack.pointer_ref(); register auto f asm("rcx") = from->stack.pointer_ref(); @@ -747,6 +800,7 @@ R"( DEF_ASM_FUNC(_photon_thread_stub) R"( + call _asan_start mov 0x40(%rbp), %rcx movq $0, 0x40(%rbp) call *0x48(%rbp) @@ -757,6 +811,7 @@ R"( ); inline void switch_context(thread* from, thread* to) { + ASAN_SWITCH(to); prepare_switch(from, to); auto _t_ = to->stack.pointer_ref(); register auto f asm("rdx") = from->stack.pointer_ref(); @@ -772,6 +827,7 @@ R"( inline void switch_context_defer(thread* from, thread* to, void (*defer)(void*), void* arg) { + ASAN_SWITCH(to); prepare_switch(from, to); auto _t_ = to->stack.pointer_ref(); register auto f asm("r9") = from->stack.pointer_ref(); @@ -820,6 +876,7 @@ R"( DEF_ASM_FUNC(_photon_thread_stub) R"( + bl _asan_start //; asan_start() ldp x0, x1, [x29, #0x40] //; load arg, start into x0, x1 str xzr, [x29, #0x40] //; set arg as 0 blr x1 //; start(x0) @@ -835,6 +892,7 @@ R"( #endif inline void switch_context(thread* from, thread* to) { + ASAN_SWITCH(to); prepare_switch(from, to); auto _t_ = to->stack.pointer_ref(); register auto f asm("x0") = from->stack.pointer_ref(); @@ -854,6 +912,7 @@ R"( inline void switch_context_defer(thread* from, thread* to, void (*defer)(void*), void* arg) { + ASAN_SWITCH(to); prepare_switch(from, to); auto _t_ = to->stack.pointer_ref(); register auto f asm("x3") = from->stack.pointer_ref(); @@ -899,8 +958,10 @@ R"( func = (uint64_t)&spinlock_unlock; arg = &lock; } - _photon_switch_context_defer_die( - arg, func, sw.to->stack.pointer_ref()); + auto ref = sw.to->stack.pointer_ref(); + ASAN_DIE_SWITCH(sw.to); + _photon_switch_context_defer_die(arg, func, ref); + __builtin_unreachable(); } __attribute__((used)) static void _photon_thread_die(thread* th) { @@ -915,7 +976,8 @@ R"( RunQ rq; if (unlikely(!rq.current)) LOG_ERROR_RETURN(ENOSYS, nullptr, "Photon not initialized in this vCPU (OS thread)"); - size_t randomizer = (rand() % 512) * 8; + thread_local uint64_t random_index = 0; + size_t randomizer = (random_index++ % 512) * 8; // stack contains struct, randomizer space, and reserved_space size_t least_stack_size = sizeof(thread) + randomizer + 63 + reserved_space; // at least a whole page for mprotect @@ -1042,12 +1104,17 @@ R"( if (likely(__mimic_vdso_time_x86)) return photon::now = __mimic_vdso_time_x86.get_now(); #endif - struct timeval tv; - gettimeofday(&tv, NULL); - uint64_t nnow = tv.tv_sec * 1000ul * 1000ul + tv.tv_usec; + struct timespec tv; +#ifdef CLOCK_BOOTTIME + clock_gettime(CLOCK_BOOTTIME, &tv); +#else + clock_gettime(CLOCK_MONOTONIC, &tv); +#endif + auto usec = tv.tv_nsec / 1000ul; + uint64_t nnow = tv.tv_sec * 1000ul * 1000ul + usec; + assert(tv.tv_sec <= UINT32_MAX && usec < 1000000); now = nnow; - assert(tv.tv_sec <= UINT32_MAX && tv.tv_usec < 1000000); - return {nnow, ((uint64_t)tv.tv_sec << 32) | (uint32_t)tv.tv_usec}; + return {nnow, ((uint64_t)tv.tv_sec << 32) | (uint32_t)usec}; } __attribute__((always_inline)) static inline uint32_t _rdtsc() @@ -1142,11 +1209,11 @@ R"( list.node = nullptr; AtomicRunQ().insert_list_before(q); } - return count; } - if (sleepq.empty() || !if_update_now()) { + if (sleepq.empty()) { return count; } + if_update_now(); do { auto th = sleepq.front(); if (th->ts_wakeup > now) break; @@ -1197,6 +1264,8 @@ R"( assert(th->state == states::READY); } else if (unlikely(th->state != states::READY)) { LOG_ERROR_RETURN(EINVAL, -1, "target thread ` must be READY!", th); + } else if (unlikely(rq.current->next() == th)) { + return thread_yield(); } auto sw = AtomicRunQ(rq).try_goto(th); @@ -1938,4 +2007,43 @@ R"( photon_thread_alloc = _photon_thread_alloc; photon_thread_dealloc = _photon_thread_dealloc; } + + extern "C" { + [[gnu::used]] + void *gdb_get_thread_stack_ptr(void *th) { + if (!th) + return nullptr; + return ((thread *)th)->stack._ptr; + } + [[gnu::used]] + void *gdb_get_current_thread() { + return CURRENT; + } + [[gnu::used]] + void *gdb_get_next_thread(void *c) { + if (!c) + return nullptr; + return ((thread *)c)->next(); + } + [[gnu::used]] + void *gdb_get_vcpu(void *th) { + if (!th) + return nullptr; + return (void *)((thread *)th)->vcpu; + } + [[gnu::used]] + size_t gdb_get_sleepq_size(void *vcpu) { + if (!vcpu) + return 0; + return ((vcpu_t *)vcpu)->sleepq.q.size(); + } + [[gnu::used]] + void *gdb_get_sleepq_item(void *vcpu, size_t idx) { + if (!vcpu) + return nullptr; + if (((vcpu_t *)vcpu)->sleepq.q.size() <= idx) + return nullptr; + return ((vcpu_t *)vcpu)->sleepq.q[idx]; + } + } } diff --git a/thread/thread11.h b/thread/thread11.h index 355a95d93..889840a2b 100644 --- a/thread/thread11.h +++ b/thread/thread11.h @@ -29,6 +29,7 @@ namespace photon { static void* __stub11(void*) { auto p = thread_reserved_space(CURRENT); tuple_assistance::apply(std::move(p->first), std::move(p->second)); + p->~Pair(); return nullptr; } diff --git a/thread/workerpool.cpp b/thread/workerpool.cpp index b57f8b46d..daa740cec 100644 --- a/thread/workerpool.cpp +++ b/thread/workerpool.cpp @@ -121,7 +121,7 @@ class WorkPool::impl { auto task = ring.recv(running_tasks.load() ? 0 : QUEUE_YIELD_COUNT, QUEUE_YIELD_US); if (!task) break; - running_tasks.fetch_add(std::memory_order_acq_rel); + running_tasks.fetch_add(1, std::memory_order_acq_rel); TaskLB tasklb{task, &running_tasks}; if (mode < 0) { delegate_helper(&tasklb); @@ -143,7 +143,7 @@ class WorkPool::impl { // must copy to keep tasklb alive TaskLB tasklb = *(TaskLB*)arg; tasklb.task(); - tasklb.count->fetch_sub(std::memory_order_acq_rel); + tasklb.count->fetch_sub(1, std::memory_order_acq_rel); return nullptr; } diff --git a/tools/photongdb.py b/tools/photongdb.py index d4d3e76b8..1f1f2885d 100644 --- a/tools/photongdb.py +++ b/tools/photongdb.py @@ -52,28 +52,25 @@ def cprint(stat, *args): def get_next_ready(p): - return gdb.parse_and_eval("(photon::thread*)%s" % p.dereference()['__next_ptr']) + return gdb.parse_and_eval("(void*)gdb_get_next_thread((void*){})".format(p)) def get_current(): - return gdb.parse_and_eval("(photon::thread*)photon::CURRENT") + return gdb.parse_and_eval("(void*)gdb_get_current_thread()") def get_vcpu(p): - return p.dereference()['vcpu'].dereference() - - -def get_sleepq(vcpu): - return vcpu['sleepq']['q'] + return gdb.parse_and_eval("(void*)gdb_get_vcpu((void*){})".format(p)) +def get_thread_stack_ptr(p): + return gdb.parse_and_eval("(void*)gdb_get_thread_stack_ptr((void*){})".format(p)) def in_sleep(q): - size = q['_M_impl']['_M_finish'] - q['_M_impl']['_M_start'] - return [(q['_M_impl']['_M_start'][i]) for i in range(size)] + size = int(gdb.parse_and_eval("(size_t)gdb_get_sleepq_size((void*){})".format(q))) + return [gdb.parse_and_eval("(void*)gdb_get_sleepq_item((void*){}, {})".format(q, i)) for i in range(size)] def switch_to_ph(regs, rsp, rbp, rip): - cprint('SWITCH', "to {} {} {}".format(hex(rsp), hex(rbp), hex(rip))) gdb.parse_and_eval("{}={}".format(regs['sp'], rsp)) gdb.parse_and_eval("{}={}".format(regs['bp'], rbp)) gdb.parse_and_eval("{}={}".format(regs['ip'], rip)) @@ -96,7 +93,7 @@ def set_u64_reg(l, r): def get_stkregs(p): - t = get_u64_ptr(p['stack']['_ptr']) + t = get_u64_ptr(get_thread_stack_ptr(p)) rsp = t + 8 rip = get_u64_val(t + 8) rbp = get_u64_val(t) @@ -119,7 +116,7 @@ def load_photon_threads(): photon.append(('READY', p, rsp, rbp, rip)) p = get_next_ready(p) vcpu = get_vcpu(c) - for t in in_sleep(get_sleepq(vcpu)): + for t in in_sleep(vcpu): rsp, rbp, rip = get_stkregs(t) photon.append(('SLEEP', t, rsp, rbp, rip)) return @@ -163,6 +160,7 @@ def invoke(self, arg, tty): arch = get_arch() regs = get_regs(arch) + cprint('SWITCH', "to {} {} {}".format(hex(photon[i][2]), hex(photon[i][3]), hex(photon[i][4]))) switch_to_ph(regs, photon[i][2], photon[i][3], photon[i][4]) @@ -225,6 +223,27 @@ def invoke(self, arg, tty): enabling = False cprint('WARNING', "Finished photon thread lookup mode.") +from threading import Lock + +class PhotonPs(gdb.Command): + def __init__(self): + gdb.Command.__init__(self, "photon_ps", + gdb.COMMAND_STACK, gdb.COMPLETE_NONE) + self.lock = Lock() + + def invoke(self, arg, tty): + with self.lock: + photon_init() + if len(photon) > 0: + for i, (stat, pth, rsp, rbp, rbi) in enumerate(photon): + cprint( + stat, '[{}]'.format(i), pth, hex(rsp), hex(rbp), hex(rbi)) + arch = get_arch() + regs = get_regs(arch) + switch_to_ph(regs, rsp, rbp, rbi) + gdb.execute("bt") + switch_to_ph(regs, photon[0][2], photon[0][3], photon[0][4]) + photon_restore() PhotonInit() PhotonFini() @@ -232,5 +251,6 @@ def invoke(self, arg, tty): PhotonThreads() PhotonLs() PhotonFr() +PhotonPs() cprint('INFO', 'Photon-GDB-extension loaded') diff --git a/tools/ppstack b/tools/ppstack new file mode 100755 index 000000000..ad5b4d64c --- /dev/null +++ b/tools/ppstack @@ -0,0 +1,50 @@ +#!/usr/bin/sh + +if test $# -ne 1; then + echo "Usage: `basename $0 .sh` " 1>&2 + exit 1 +fi + +if test ! -r /proc/$1; then + echo "Process $1 not found." 1>&2 + exit 1 +fi + +# GDB doesn't allow "thread apply all bt" when the process isn't +# threaded; need to peek at the process to determine if that or the +# simpler "bt" should be used. + +backtrace="photon_ps" +if test -d /proc/$1/task ; then + # Newer kernel; has a task/ directory. + if test `/bin/ls /proc/$1/task | /usr/bin/wc -l` -gt 1 2>/dev/null ; then + backtrace="thread apply all photon_ps" + fi +elif test -f /proc/$1/maps ; then + # Older kernel; go by it loading libpthread. + if /bin/grep -e libpthread /proc/$1/maps > /dev/null 2>&1 ; then + backtrace="thread apply all photon_ps" + fi +fi + +GDB=${GDB:-gdb} +PHOTONDB=${PHOTONDB:-/usr/local/lib/photon/tools/photongdb.py} +PHOTONSO=${PHOTONSO:-/usr/local/lib/libphoton.so} + +# Run GDB, strip out unwanted noise. +# --readnever is no longer used since .gdb_index is now in use. +$GDB --quiet -nx $GDBARGS /proc/$1/exe $1 <&1 | +set width 0 +set height 0 +set pagination no +source $PHOTONDB +add-symbol-file $PHOTONSO +$backtrace +EOF +/bin/sed -n \ + -e 's/^\((gdb) \)*//' \ + -e '/^#/p' \ + -e '/^Thread/p' \ + -e '/CURRENT/p' \ + -e '/READY/p' \ + -e '/SLEEP/p' \ No newline at end of file