-
Notifications
You must be signed in to change notification settings - Fork 14
[Join] Inline and parallelize tbb in getAllTableColumnFragments. #616
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,15 +16,16 @@ | |
|
|
||
| #include "QueryEngine/ColumnFetcher.h" | ||
|
|
||
| #include <memory> | ||
|
|
||
| #include "DataMgr/ArrayNoneEncoder.h" | ||
| #include "QueryEngine/ErrorHandling.h" | ||
| #include "QueryEngine/Execute.h" | ||
| #include "Shared/Intervals.h" | ||
| #include "Shared/likely.h" | ||
| #include "Shared/sqltypes.h" | ||
|
|
||
| #include <tbb/parallel_for.h> | ||
| #include <memory> | ||
|
|
||
| namespace { | ||
|
|
||
| std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel) { | ||
|
|
@@ -239,6 +240,11 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( | |
| int db_id = col_info->db_id; | ||
| int table_id = col_info->table_id; | ||
| int col_id = col_info->column_id; | ||
|
|
||
| // Array type passed to getAllTableColumnFragments. Should be handled in | ||
| // linearization. | ||
| CHECK(!col_info->type->isString() && !col_info->type->isArray()); | ||
|
|
||
| const auto fragments_it = all_tables_fragments.find({db_id, table_id}); | ||
| CHECK(fragments_it != all_tables_fragments.end()); | ||
| const auto fragments = fragments_it->second; | ||
|
|
@@ -248,7 +254,6 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( | |
| const InputDescriptor table_desc(db_id, table_id, int(0)); | ||
| { | ||
| std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_); | ||
|
|
||
| auto col_token = data_provider_->getZeroCopyColumnData(*col_info); | ||
| if (col_token != nullptr) { | ||
| size_t num_rows = col_token->getSize() / col_token->getType()->size(); | ||
|
|
@@ -262,44 +267,91 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments( | |
| } | ||
|
|
||
| auto column_it = columnarized_scan_table_cache_.find({table_id, col_id}); | ||
| if (column_it == columnarized_scan_table_cache_.end()) { | ||
| for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { | ||
| if (executor_->getConfig() | ||
| .exec.interrupt.enable_non_kernel_time_query_interrupt && | ||
| executor_->checkNonKernelTimeInterrupted()) { | ||
| throw QueryExecutionError(Executor::ERR_INTERRUPTED); | ||
| } | ||
| std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder; | ||
| std::list<ChunkIter> chunk_iter_holder; | ||
| const auto& fragment = (*fragments)[frag_id]; | ||
| if (fragment.isEmptyPhysicalFragment()) { | ||
| continue; | ||
| } | ||
| auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id); | ||
| CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); | ||
| auto col_buffer = getOneTableColumnFragment(col_info, | ||
| static_cast<int>(frag_id), | ||
| all_tables_fragments, | ||
| chunk_holder, | ||
| chunk_iter_holder, | ||
| Data_Namespace::CPU_LEVEL, | ||
| int(0), | ||
| device_allocator); | ||
| column_frags.push_back( | ||
| std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_, | ||
| col_buffer, | ||
| fragment.getNumTuples(), | ||
| chunk_meta_it->second->type(), | ||
| thread_idx)); | ||
| } | ||
| auto merged_results = | ||
| ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags); | ||
| if (column_it != columnarized_scan_table_cache_.end()) { | ||
| table_column = column_it->second.get(); | ||
| return ColumnFetcher::transferColumnIfNeeded( | ||
| table_column, 0, memory_level, device_id, device_allocator); | ||
| } | ||
|
|
||
| if (executor_->getConfig().exec.interrupt.enable_non_kernel_time_query_interrupt && | ||
| executor_->checkNonKernelTimeInterrupted()) { | ||
| throw QueryExecutionError(Executor::ERR_INTERRUPTED); | ||
| } | ||
|
|
||
| size_t total_row_count = 0; | ||
| for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { | ||
| const auto& fragment = (*fragments)[frag_id]; | ||
| const auto rows_in_frag = fragment.getNumTuples(); | ||
| total_row_count += rows_in_frag; | ||
| } | ||
|
|
||
| if (total_row_count == 0) { | ||
| std::unique_ptr<ColumnarResults> merged_results(nullptr); | ||
|
|
||
| table_column = merged_results.get(); | ||
| columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id), | ||
| std::move(merged_results)); | ||
| } else { | ||
| table_column = column_it->second.get(); | ||
|
|
||
| return ColumnFetcher::transferColumnIfNeeded( | ||
| table_column, 0, memory_level, device_id, device_allocator); | ||
| } | ||
|
|
||
| const auto type_width = col_info->type->size(); | ||
| auto write_ptr = | ||
| executor_->row_set_mem_owner_->allocate(type_width * total_row_count); | ||
ienkovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| std::vector<std::pair<int8_t*, size_t>> write_ptrs; | ||
| std::vector<size_t> valid_fragments; | ||
| for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) { | ||
| const auto& fragment = (*fragments)[frag_id]; | ||
| if (fragment.isEmptyPhysicalFragment()) { | ||
| continue; | ||
| } | ||
| CHECK_EQ(type_width, fragment.getChunkMetadataMap().at(col_id)->type()->size()); | ||
| write_ptrs.push_back({write_ptr, fragment.getNumTuples() * type_width}); | ||
| write_ptr += fragment.getNumTuples() * type_width; | ||
| valid_fragments.push_back(frag_id); | ||
| } | ||
|
|
||
| CHECK(!write_ptrs.empty()); | ||
| size_t valid_frag_count = valid_fragments.size(); | ||
| tbb::parallel_for( | ||
| tbb::blocked_range<size_t>(0, valid_frag_count), | ||
| [&](const tbb::blocked_range<size_t>& frag_ids) { | ||
| for (size_t v_frag_id = frag_ids.begin(); v_frag_id < frag_ids.end(); | ||
| ++v_frag_id) { | ||
| std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder; | ||
| std::list<ChunkIter> chunk_iter_holder; | ||
| size_t frag_id = valid_fragments[v_frag_id]; | ||
| const auto& fragment = (*fragments)[frag_id]; | ||
| auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id); | ||
| CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end()); | ||
| std::shared_ptr<Chunk_NS::Chunk> chunk; | ||
| { | ||
| ChunkKey chunk_key{ | ||
| db_id, fragment.physicalTableId, col_id, fragment.fragmentId}; | ||
| chunk = data_provider_->getChunk(col_info, | ||
| chunk_key, | ||
| Data_Namespace::CPU_LEVEL, | ||
| 0, | ||
| chunk_meta_it->second->numBytes(), | ||
| chunk_meta_it->second->numElements()); | ||
| std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_); | ||
| chunk_holder.push_back(chunk); | ||
| } | ||
| auto ab = chunk->getBuffer(); | ||
| CHECK(ab->getMemoryPtr()); | ||
| int8_t* col_buffer = | ||
| ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter | ||
| memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second); | ||
ienkovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| }); | ||
|
|
||
| std::unique_ptr<ColumnarResults> merged_results(new ColumnarResults( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The vector
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand. Do you mean that we are using only data from first fragment? If so why we are fetching them?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, sorry, now I get it. |
||
| {write_ptrs[0].first}, total_row_count, col_info->type, thread_idx)); | ||
|
|
||
| table_column = merged_results.get(); | ||
| columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id), | ||
| std::move(merged_results)); | ||
| } | ||
| return ColumnFetcher::transferColumnIfNeeded( | ||
| table_column, 0, memory_level, device_id, device_allocator); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.