From f28d59cb06b0ebfa2fa9ba6839bf126dd6f1c31a Mon Sep 17 00:00:00 2001 From: Dmitrii Makarenko Date: Tue, 1 Aug 2023 06:58:50 +0000 Subject: [PATCH] [Fetch Data] Linearized buffer memory managment This commit removes freeLinearizedBuf() to avoid chunks that have references to deleted buffers. They will now be marked as deleted when unpinned. All buffers should now be removed via BufferManagers::free. Signed-off-by: Dmitrii Makarenko --- omniscidb/DataMgr/BufferMgr/Buffer.cpp | 33 ++++++++++ omniscidb/DataMgr/BufferMgr/Buffer.h | 22 ++----- omniscidb/DataMgr/BufferMgr/BufferMgr.cpp | 12 +++- omniscidb/DataMgr/Chunk/Chunk.h | 9 ++- omniscidb/QueryEngine/ColumnFetcher.cpp | 35 ++-------- omniscidb/QueryEngine/ColumnFetcher.h | 29 ++++++++- omniscidb/QueryEngine/Execute.cpp | 4 -- omniscidb/Tests/JoinHashTableTest.cpp | 79 +++++++++++++++++++++++ 8 files changed, 166 insertions(+), 57 deletions(-) diff --git a/omniscidb/DataMgr/BufferMgr/Buffer.cpp b/omniscidb/DataMgr/BufferMgr/Buffer.cpp index dc804b69ba..c7e7520d65 100644 --- a/omniscidb/DataMgr/BufferMgr/Buffer.cpp +++ b/omniscidb/DataMgr/BufferMgr/Buffer.cpp @@ -166,4 +166,37 @@ int8_t* Buffer::getMemoryPtr() { void Buffer::setMemoryPtr(int8_t* new_ptr) { mem_ = new_ptr; } + +void Buffer::deleteSelf() { + // ZeroCopy buffers don't have correct iterators by design. + // To delete it we are detecting them and delete explicitly without removing segment as + // it done in deleteBuffer(...) + if (seg_it_ == BufferList::iterator()) { + delete this; + return; + } + + bm_->deleteBuffer(seg_it_->chunk_key); +} + +int Buffer::unPin() { + std::unique_lock pin_lock(pin_mutex_); + int res = (--pin_count_); + if (!res && delete_on_unpin_) { + pin_lock.unlock(); + // deleteSelf + deleteSelf(); + } + return res; +} + +void Buffer::deleteWhenUnpinned() { + std::unique_lock pin_lock(pin_mutex_); + if (pin_count_) { + delete_on_unpin_ = true; + } else { + pin_lock.unlock(); + deleteSelf(); + } +} } // namespace Buffer_Namespace diff --git a/omniscidb/DataMgr/BufferMgr/Buffer.h b/omniscidb/DataMgr/BufferMgr/Buffer.h index 92a880e2d4..ea62d5ec9f 100644 --- a/omniscidb/DataMgr/BufferMgr/Buffer.h +++ b/omniscidb/DataMgr/BufferMgr/Buffer.h @@ -134,28 +134,14 @@ class Buffer : public AbstractBuffer { return (++pin_count_); } - inline int unPin() override { - std::lock_guard pin_lock(pin_mutex_); - int res = (--pin_count_); - if (!res && delete_on_unpin_) { - delete this; - } - return res; - } + int unPin() override; + inline int getPinCount() override { std::lock_guard pin_lock(pin_mutex_); return (pin_count_); } - inline void deleteWhenUnpinned() override { - std::unique_lock pin_lock(pin_mutex_); - if (pin_count_) { - delete_on_unpin_ = true; - } else { - pin_lock.unlock(); - delete this; - } - } + void deleteWhenUnpinned() override; // Added for testing. int32_t getSlabNum() const { return seg_it_->slab_num; } @@ -177,6 +163,8 @@ class Buffer : public AbstractBuffer { const MemoryLevel src_buffer_type = CPU_LEVEL, const int src_device_id = -1) = 0; + void deleteSelf(); + BufferMgr* bm_; BufferList::iterator seg_it_; size_t page_size_; /// the size of each page in the buffer diff --git a/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp b/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp index fc710d462e..530c0c7730 100644 --- a/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp +++ b/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp @@ -103,6 +103,12 @@ void BufferMgr::clear() { // for removal to have them deleted when unpinned. for (auto& buf : chunk_index_) { if (buf.second->buffer) { + // WARN !!!!!!!!!!!!!!!!!! + // deleteWhenUnpinned(...) will call free(...) that will call deleteBuffer(...) in + // case when segment iterator is valid. That method will try to acquire + // chunk_index_mutex_ that already acquired here. To avoid deadlock and remove all + // the stuff, we are cleaning segments later we are removing segment iterator. + buf.second->buffer->seg_it_ = BufferList::iterator(); buf.second->buffer->deleteWhenUnpinned(); buf.second->buffer = nullptr; } @@ -623,6 +629,7 @@ void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) { chunk_index_lock.unlock(); std::lock_guard sized_segs_lock(sized_segs_mutex_); if (seg_it->buffer) { + CHECK_EQ(seg_it->buffer->getPinCount(), 0); delete seg_it->buffer; // Delete Buffer for segment seg_it->buffer = 0; } @@ -826,12 +833,15 @@ AbstractBuffer* BufferMgr::alloc(const size_t num_bytes) { return createBuffer(chunk_key, page_size_, num_bytes); } +// all buffer deletions should be done via free(...) void BufferMgr::free(AbstractBuffer* buffer) { Buffer* casted_buffer = dynamic_cast(buffer); if (casted_buffer == 0) { LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type."; } - deleteBuffer(casted_buffer->seg_it_->chunk_key); + CHECK_EQ(casted_buffer->getPinCount(), 1); + casted_buffer->deleteWhenUnpinned(); + casted_buffer->unPin(); } size_t BufferMgr::getNumChunks() { diff --git a/omniscidb/DataMgr/Chunk/Chunk.h b/omniscidb/DataMgr/Chunk/Chunk.h index 30de33d30b..cfcf77959a 100644 --- a/omniscidb/DataMgr/Chunk/Chunk.h +++ b/omniscidb/DataMgr/Chunk/Chunk.h @@ -51,7 +51,14 @@ class Chunk { : buffer_(nullptr), index_buf_(nullptr), column_info_(col_info) {} Chunk(AbstractBuffer* b, AbstractBuffer* ib, ColumnInfoPtr col_info) - : buffer_(b), index_buf_(ib), column_info_(col_info) {} + : buffer_(b), index_buf_(ib), column_info_(col_info) { + if (buffer_) { + buffer_->pin(); + } + if (index_buf_) { + index_buf_->pin(); + } + } ~Chunk() { unpinBuffer(); } diff --git a/omniscidb/QueryEngine/ColumnFetcher.cpp b/omniscidb/QueryEngine/ColumnFetcher.cpp index 62fd71f33a..0b2bc13ede 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.cpp +++ b/omniscidb/QueryEngine/ColumnFetcher.cpp @@ -521,6 +521,10 @@ const int8_t* ColumnFetcher::linearizeColumnFragments( } } CHECK(res.first); // check merged data buffer + // This buffers are associated with Chunk, that created by hands, not with + // Chunk::getChunk(...) method So it should be removed to do it we mark both buffers to + // delete on unpin in ColumnFetcher dtor. Pin means that none of chunks are uses this + // buffer. if (!type->isFixedLenArray()) { CHECK(res.second); // check merged index buffer } @@ -1061,34 +1065,3 @@ ChunkIter ColumnFetcher::prepareChunkIter(AbstractBuffer* merged_data_buf, merged_chunk_iter.elem_type_size = chunk_iter.elem_type_size; return merged_chunk_iter; } - -void ColumnFetcher::freeLinearizedBuf() { - std::lock_guard linearized_col_cache_guard(linearized_col_cache_mutex_); - auto buffer_provider = executor_->getBufferProvider(); - - if (!linearized_data_buf_cache_.empty()) { - for (auto& kv : linearized_data_buf_cache_) { - for (auto& kv2 : kv.second) { - buffer_provider->free(kv2.second); - } - } - } - - if (!linearized_idx_buf_cache_.empty()) { - for (auto& kv : linearized_idx_buf_cache_) { - for (auto& kv2 : kv.second) { - buffer_provider->free(kv2.second); - } - } - } -} - -void ColumnFetcher::freeTemporaryCpuLinearizedIdxBuf() { - std::lock_guard linearized_col_cache_guard(linearized_col_cache_mutex_); - auto buffer_provider = executor_->getBufferProvider(); - if (!linearlized_temporary_cpu_index_buf_cache_.empty()) { - for (auto& kv : linearlized_temporary_cpu_index_buf_cache_) { - buffer_provider->free(kv.second); - } - } -} diff --git a/omniscidb/QueryEngine/ColumnFetcher.h b/omniscidb/QueryEngine/ColumnFetcher.h index 6a43f56e26..6694197067 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.h +++ b/omniscidb/QueryEngine/ColumnFetcher.h @@ -37,6 +37,32 @@ class ColumnFetcher { ColumnFetcher(Executor* executor, DataProvider* data_provider, const ColumnCacheMap& column_cache); + ~ColumnFetcher() { + if (!linearized_data_buf_cache_.empty()) { + for (auto& kv : linearized_data_buf_cache_) { + for (auto& kv2 : kv.second) { + kv2.second->deleteWhenUnpinned(); + kv2.second->unPin(); + } + } + } + + if (!linearized_idx_buf_cache_.empty()) { + for (auto& kv : linearized_idx_buf_cache_) { + for (auto& kv2 : kv.second) { + kv2.second->deleteWhenUnpinned(); + kv2.second->unPin(); + } + } + } + + if (!linearlized_temporary_cpu_index_buf_cache_.empty()) { + for (auto& kv : linearlized_temporary_cpu_index_buf_cache_) { + kv.second->deleteWhenUnpinned(); + kv.second->unPin(); + } + } + }; //! Gets one chunk's pointer and element count on either CPU or GPU. static std::pair getOneColumnFragment( @@ -93,9 +119,6 @@ class ColumnFetcher { DeviceAllocator* device_allocator, const size_t thread_idx) const; - void freeTemporaryCpuLinearizedIdxBuf(); - void freeLinearizedBuf(); - DataProvider* getDataProvider() const { return data_provider_; } private: diff --git a/omniscidb/QueryEngine/Execute.cpp b/omniscidb/QueryEngine/Execute.cpp index 3bdbe9abbe..2d3ab83e42 100644 --- a/omniscidb/QueryEngine/Execute.cpp +++ b/omniscidb/QueryEngine/Execute.cpp @@ -2186,10 +2186,6 @@ hdk::ResultSetTable Executor::executeWorkUnitImpl( do { SharedKernelContext shared_context(query_infos); ColumnFetcher column_fetcher(this, data_provider, column_cache); - ScopeGuard scope_guard = [&column_fetcher] { - column_fetcher.freeLinearizedBuf(); - column_fetcher.freeTemporaryCpuLinearizedIdxBuf(); - }; if (ra_exe_unit.isShuffle()) { allocateShuffleBuffers(query_infos, ra_exe_unit, row_set_mem_owner, shared_context); diff --git a/omniscidb/Tests/JoinHashTableTest.cpp b/omniscidb/Tests/JoinHashTableTest.cpp index a1decec97d..7b46788141 100644 --- a/omniscidb/Tests/JoinHashTableTest.cpp +++ b/omniscidb/Tests/JoinHashTableTest.cpp @@ -670,6 +670,85 @@ TEST(Other, Regression) { dropTable("table_b"); } +TEST(Other, FixedLenArr) { + createTable("table_a", + {{"si", ctx().int16()}, {"FixedLen", ctx().arrayFixed(2, ctx().int64())}}, + {2}); + + std::ostringstream oss; + oss << "{\"si\": 1, \"FixedLen\": [" << 7 << ", " << 233 << "]}" << std::endl; + oss << "{\"si\": 1, \"FixedLen\": [" << 5 << ", " << 47 << "]}" << std::endl; + oss << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl; + oss << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl; + oss << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl; + oss << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl; + oss << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl; + insertJsonValues("table_a", oss.str()); + + createTable("table_b", + {{"si", ctx().int8()}, {"FixedLen", ctx().arrayFixed(2, ctx().int32())}}, + {2}); + + std::ostringstream oss_2; + oss_2 << "{\"si\": 1, \"FixedLen\": [" << 2 << ", " << 33 << "]}" << std::endl; + oss_2 << "{\"si\": 2, \"FixedLen\": [" << 1 << ", " << 17 << "]}" << std::endl; + oss_2 << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl; + oss_2 << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl; + oss_2 << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl; + oss_2 << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl; + oss_2 << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl; + insertJsonValues("table_b", oss_2.str()); + + EXPECT_NO_THROW(run_multiple_agg(R"( + SELECT * FROM table_a, table_b WHERE table_a.si = table_b.si; + )", + ExecutorDeviceType::CPU)); + + dropTable("table_a"); + dropTable("table_b"); +} + +TEST(Other, FixedLenArr2) { + config().rs.enable_lazy_fetch = false; + + createTable("table_a", + {{"si", ctx().int16()}, {"FixedLen", ctx().arrayFixed(2, ctx().int16())}}, + {1}); + + std::ostringstream oss; + oss << "{\"si\": 1, \"FixedLen\": [" << 7 << ", " << 233 << "]}" << std::endl; + oss << "{\"si\": 1, \"FixedLen\": [" << 5 << ", " << 47 << "]}" << std::endl; + oss << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl; + oss << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl; + oss << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl; + oss << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl; + oss << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl; + insertJsonValues("table_a", oss.str()); + + createTable("table_b", + {{"si", ctx().int8()}, {"FixedLen", ctx().arrayFixed(2, ctx().int32())}}, + {2}); + + std::ostringstream oss_2; + oss_2 << "{\"si\": 1, \"FixedLen\": [" << 2 << ", " << 33 << "]}" << std::endl; + oss_2 << "{\"si\": 2, \"FixedLen\": [" << 1 << ", " << 17 << "]}" << std::endl; + oss_2 << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl; + oss_2 << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl; + oss_2 << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl; + oss_2 << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl; + oss_2 << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl; + insertJsonValues("table_b", oss_2.str()); + + EXPECT_NO_THROW(run_multiple_agg(R"( + SELECT * FROM table_a INNER JOIN table_b ON table_a.FixedLen[1] = + table_b.FixedLen[1]; + )", + ExecutorDeviceType::CPU)); + + dropTable("table_a"); + dropTable("table_b"); +} + int main(int argc, char** argv) { TestHelpers::init_logger_stderr_only(argc, argv); testing::InitGoogleTest(&argc, argv);