Skip to content

Commit

Permalink
[refactor](execenv) remove shared ptr from exec env
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei committed Dec 28, 2024
1 parent 2b20512 commit ff37b66
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 53 deletions.
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_stream_load_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
namespace doris {

class CloudStreamLoadExecutor final : public StreamLoadExecutor {
ENABLE_FACTORY_CREATOR(CloudStreamLoadExecutor);

public:
CloudStreamLoadExecutor(ExecEnv* exec_env);

Expand Down
6 changes: 0 additions & 6 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@

namespace doris {

ExecEnv::ExecEnv() = default;

ExecEnv::~ExecEnv() {
destroy();
}

#ifdef BE_TEST
void ExecEnv::set_inverted_index_searcher_cache(
segment_v2::InvertedIndexSearcherCache* inverted_index_searcher_cache) {
Expand Down
30 changes: 12 additions & 18 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,14 @@ class ExecEnv {
}
LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr.get(); }
std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return _new_load_stream_mgr; }
NewLoadStreamMgr* new_load_stream_mgr() { return _new_load_stream_mgr.get(); }
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
doris::vectorized::SpillStreamManager* spill_stream_mgr() { return _spill_stream_mgr; }
GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; }

const std::vector<StorePath>& store_paths() const { return _store_paths; }

std::shared_ptr<StreamLoadExecutor> stream_load_executor() { return _stream_load_executor; }
StreamLoadExecutor* stream_load_executor() { return _stream_load_executor.get(); }
RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; }
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; }
Expand All @@ -273,12 +273,10 @@ class ExecEnv {
_memtable_memory_limiter.reset(limiter);
}
void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = cluster_info; }
void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr) {
this->_new_load_stream_mgr = new_load_stream_mgr;
}
void set_stream_load_executor(std::shared_ptr<StreamLoadExecutor> stream_load_executor) {
this->_stream_load_executor = stream_load_executor;
}
void set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr);
void clear_new_load_stream_mgr();
void set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor);
void clear_stream_load_executor();

void set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine);
void set_inverted_index_searcher_cache(
Expand All @@ -294,10 +292,9 @@ class ExecEnv {
void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) {
this->_routine_load_task_executor = r;
}
void set_wal_mgr(std::shared_ptr<WalManager> wm) { this->_wal_manager = wm; }
void set_dummy_lru_cache(std::shared_ptr<DummyLRUCache> dummy_lru_cache) {
this->_dummy_lru_cache = dummy_lru_cache;
}
void set_wal_mgr(std::unique_ptr<WalManager>&& wm);
void clear_wal_mgr();

void set_write_cooldown_meta_executors();
static void set_tracking_memory(bool tracking_memory) {
_s_tracking_memory.store(tracking_memory, std::memory_order_release);
Expand Down Expand Up @@ -331,7 +328,6 @@ class ExecEnv {
return _inverted_index_query_cache;
}
QueryCache* get_query_cache() { return _query_cache; }
std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return _dummy_lru_cache; }

pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
return _runtime_filter_timer_queue;
Expand Down Expand Up @@ -429,13 +425,12 @@ class ExecEnv {
BrokerMgr* _broker_mgr = nullptr;
LoadChannelMgr* _load_channel_mgr = nullptr;
std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
// TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle.
std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
std::unique_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
BrpcClientCache<PBackendService_Stub>* _streaming_client_cache = nullptr;
BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;

std::shared_ptr<StreamLoadExecutor> _stream_load_executor;
std::unique_ptr<StreamLoadExecutor> _stream_load_executor;
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
HeartbeatFlags* _heartbeat_flags = nullptr;
Expand All @@ -446,7 +441,7 @@ class ExecEnv {
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::shared_ptr<WalManager> _wal_manager;
std::unique_ptr<WalManager> _wal_manager;
DNSCache* _dns_cache = nullptr;
std::unique_ptr<WriteCooldownMetaExecutors> _write_cooldown_meta_executors;

Expand All @@ -473,7 +468,6 @@ class ExecEnv {
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr;
segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
QueryCache* _query_cache = nullptr;
std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr;
std::unique_ptr<io::FDCache> _file_cache_open_fd_cache;

pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr;
Expand Down
39 changes: 33 additions & 6 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ ThreadPool* ExecEnv::non_block_close_thread_pool() {
#endif
}

ExecEnv::ExecEnv() = default;

ExecEnv::~ExecEnv() {
destroy();
}

Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths,
const std::vector<StorePath>& spill_store_paths,
const std::set<std::string>& broken_paths) {
Expand Down Expand Up @@ -290,16 +296,16 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_store_paths.size() * config::flush_thread_num_per_store,
static_cast<size_t>(CpuInfo::num_cores()) * config::max_flush_thread_num_per_cpu);
_load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
_new_load_stream_mgr = NewLoadStreamMgr::create_shared();
_new_load_stream_mgr = NewLoadStreamMgr::create_unique();
_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_streaming_client_cache =
new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", "streaming");
_function_client_cache =
new BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
if (config::is_cloud_mode()) {
_stream_load_executor = std::make_shared<CloudStreamLoadExecutor>(this);
_stream_load_executor = CloudStreamLoadExecutor::create_unique(this);
} else {
_stream_load_executor = StreamLoadExecutor::create_shared(this);
_stream_load_executor = StreamLoadExecutor::create_unique(this);
}
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit()));
Expand All @@ -309,7 +315,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
_file_cache_open_fd_cache = std::make_unique<io::FDCache>();
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
_wal_manager = WalManager::create_unique(this, config::group_commit_wal_path);
_dns_cache = new DNSCache();
_write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
_spill_stream_mgr = new vectorized::SpillStreamManager(std::move(spill_store_map));
Expand Down Expand Up @@ -464,8 +470,6 @@ Status ExecEnv::_init_mem_env() {
return Status::InternalError(ss.str());
}

_dummy_lru_cache = std::make_shared<DummyLRUCache>();

_cache_manager = CacheManager::create_global_instance();

int64_t storage_cache_limit =
Expand Down Expand Up @@ -681,7 +685,30 @@ void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
}
#ifdef BE_TEST
void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr) {
this->_new_load_stream_mgr = std::move(new_load_stream_mgr);
}

void ExecEnv::clear_new_load_stream_mgr() {
this->_new_load_stream_mgr.reset();
}

void ExecEnv::set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor) {
this->_stream_load_executor = std::move(stream_load_executor);
}

void ExecEnv::clear_stream_load_executor() {
this->_stream_load_executor.reset();
}

void ExecEnv::set_wal_mgr(std::unique_ptr<WalManager>&& wm) {
this->_wal_manager = std::move(wm);
}
void ExecEnv::clear_wal_mgr() {
this->_wal_manager.reset();
}
#endif
// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method.
// We need to stop all threads before releasing resource.
void ExecEnv::destroy() {
Expand Down
12 changes: 5 additions & 7 deletions be/src/runtime/memory/lru_cache_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ class LRUCachePolicy : public CachePolicy {
new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards,
element_count_capacity, is_lru_k));
} else {
CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache());
_cache = ExecEnv::GetInstance()->get_dummy_lru_cache();
_cache = std::make_shared<doris::DummyLRUCache>();
}
_init_mem_tracker(lru_cache_type_string(lru_cache_type));
}
Expand All @@ -64,8 +63,7 @@ class LRUCachePolicy : public CachePolicy {
cache_value_time_extractor, cache_value_check_timestamp,
element_count_capacity, is_lru_k));
} else {
CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache());
_cache = ExecEnv::GetInstance()->get_dummy_lru_cache();
_cache = std::make_shared<doris::DummyLRUCache>();
}
_init_mem_tracker(lru_cache_type_string(lru_cache_type));
}
Expand Down Expand Up @@ -157,7 +155,7 @@ class LRUCachePolicy : public CachePolicy {
std::lock_guard<std::mutex> l(_lock);
COUNTER_SET(_freed_entrys_counter, (int64_t)0);
COUNTER_SET(_freed_memory_counter, (int64_t)0);
if (_stale_sweep_time_s <= 0 || _cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) {
if (_stale_sweep_time_s <= 0 || std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
return;
}
if (exceed_prune_limit()) {
Expand Down Expand Up @@ -204,7 +202,7 @@ class LRUCachePolicy : public CachePolicy {
std::lock_guard<std::mutex> l(_lock);
COUNTER_SET(_freed_entrys_counter, (int64_t)0);
COUNTER_SET(_freed_memory_counter, (int64_t)0);
if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) {
if (std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
return;
}
if ((force && mem_consumption() != 0) || exceed_prune_limit()) {
Expand Down Expand Up @@ -246,7 +244,7 @@ class LRUCachePolicy : public CachePolicy {
COUNTER_SET(_freed_entrys_counter, (int64_t)0);
COUNTER_SET(_freed_memory_counter, (int64_t)0);
COUNTER_SET(_cost_timer, (int64_t)0);
if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) {
if (std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
return 0;
}

Expand Down
4 changes: 2 additions & 2 deletions be/test/http/stream_load_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ void http_request_done_cb(struct evhttp_request* req, void* arg) {

TEST_F(StreamLoadTest, TestHeader) {
// 1G
auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(), config::group_commit_wal_path);
auto wal_mgr = WalManager::create_unique(ExecEnv::GetInstance(), config::group_commit_wal_path);
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0));
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0, 0));
static_cast<void>(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0, 0));
ExecEnv::GetInstance()->set_wal_mgr(wal_mgr);
ExecEnv::GetInstance()->set_wal_mgr(std::move(wal_mgr));
// 1. empty info
{
auto* evhttp_req = evhttp_request_new(nullptr, nullptr);
Expand Down
13 changes: 8 additions & 5 deletions be/test/olap/wal/wal_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ class WalManagerTest : public testing::Test {
_env->_cluster_info->master_fe_addr.hostname = "host name";
_env->_cluster_info->master_fe_addr.port = 1234;
_env->_cluster_info->backend_id = 1001;
_env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared();
_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env->_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_env->_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
_env->_stream_load_executor = StreamLoadExecutor::create_shared(_env);
_env->_stream_load_executor = StreamLoadExecutor::create_unique(_env);
_env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
_env->_wal_manager = WalManager::create_shared(_env, wal_dir.string());
_env->set_wal_mgr(WalManager::create_unique(_env, wal_dir.string()));
k_stream_load_begin_result = TLoadTxnBeginResult();
}
void TearDown() override {
Expand All @@ -78,6 +78,9 @@ class WalManagerTest : public testing::Test {
SAFE_DELETE(_env->_function_client_cache);
SAFE_DELETE(_env->_internal_client_cache);
SAFE_DELETE(_env->_cluster_info);
_env->clear_new_load_stream_mgr();
_env->clear_stream_load_executor();
//_env->clear_wal_mgr();
}

void prepare() {
Expand Down Expand Up @@ -155,9 +158,9 @@ TEST_F(WalManagerTest, recovery_normal) {
}

TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
auto wal_mgr = WalManager::create_shared(_env, config::group_commit_wal_path);
auto wal_mgr = WalManager::create_unique(_env, config::group_commit_wal_path);
static_cast<void>(wal_mgr->init());
_env->set_wal_mgr(wal_mgr);
_env->set_wal_mgr(std::move(wal_mgr));

// 1T
size_t available_bytes = 1099511627776;
Expand Down
18 changes: 11 additions & 7 deletions be/test/runtime/routine_load_task_executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,27 @@ class RoutineLoadTaskExecutorTest : public testing::Test {
RoutineLoadTaskExecutorTest() = default;
~RoutineLoadTaskExecutorTest() override = default;

ExecEnv* _env = nullptr;

void SetUp() override {
_env = ExecEnv::GetInstance();
k_stream_load_begin_result = TLoadTxnBeginResult();
k_stream_load_commit_result = TLoadTxnCommitResult();
k_stream_load_rollback_result = TLoadTxnRollbackResult();
k_stream_load_put_result = TStreamLoadPutResult();

_env.set_cluster_info(new ClusterInfo());
_env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));
_env->set_cluster_info(new ClusterInfo());
_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env->set_stream_load_executor(StreamLoadExecutor::create_unique(_env));

config::max_routine_load_thread_pool_size = 1024;
config::max_consumer_num_per_group = 3;
}

void TearDown() override { delete _env.cluster_info(); }

ExecEnv _env;
void TearDown() override {
_env->clear_new_load_stream_mgr();
_env->clear_stream_load_executor();
}
};

TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
Expand All @@ -92,7 +96,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {

task.__set_kafka_load_info(k_info);

RoutineLoadTaskExecutor executor(&_env);
RoutineLoadTaskExecutor executor(_env);
Status st;
st = executor.init(1024 * 1024);
EXPECT_TRUE(st.ok());
Expand Down
1 change: 0 additions & 1 deletion be/test/testutil/run_all_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance());
doris::ExecEnv::GetInstance()->set_process_profile(
doris::ProcessProfile::create_global_instance());
doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared<doris::DummyLRUCache>());
doris::ExecEnv::GetInstance()->set_storage_page_cache(
doris::StoragePageCache::create_global_cache(1 << 30, 10, 0));
doris::ExecEnv::GetInstance()->set_segment_loader(new doris::SegmentLoader(1000, 1000));
Expand Down
3 changes: 2 additions & 1 deletion be/test/vec/exec/vwal_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class VWalScannerTest : public testing::Test {
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
fmt::format("fail to delete dir={}", _wal_dir));
SAFE_STOP(_env->_wal_manager);
_env->clear_wal_mgr();
}

protected:
Expand Down Expand Up @@ -286,7 +287,7 @@ void VWalScannerTest::init() {
_env->_cluster_info->master_fe_addr.hostname = "host name";
_env->_cluster_info->master_fe_addr.port = _backend_id;
_env->_cluster_info->backend_id = 1001;
_env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
_env->set_wal_mgr(WalManager::create_unique(_env, _wal_dir));
std::string base_path;
auto st = _env->_wal_manager->_init_wal_dirs_info();
st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1, _label_1, base_path,
Expand Down

0 comments on commit ff37b66

Please sign in to comment.