From e5ea210d2cac1b2cffa6bcebf66e922dbf44f209 Mon Sep 17 00:00:00 2001 From: Matthieu Gautier Date: Tue, 24 May 2022 16:22:06 +0200 Subject: [PATCH] Add a template specialization for ConcurrentCache storing shared_ptr When ConcurrentCache store a shared_ptr we may have shared_ptr in used while the ConcurrentCache has drop it. When we "recreate" a value to put in the cache, we don't want to recreate it, but copying the shared_ptr in use. To do so we use a (unlimited) store of weak_ptr (aka `WeakStore`) Every created shared_ptr added to the cache has a weak_ptr ref also stored in the WeakStore, and we check the WeakStore before creating the value. --- src/tools/concurrent_cache.h | 107 +++++++++++++++++++++++++++++++++++ test/lrucache.cpp | 21 +++++++ 2 files changed, 128 insertions(+) diff --git a/src/tools/concurrent_cache.h b/src/tools/concurrent_cache.h index 3ee8ce60e..5195430c5 100644 --- a/src/tools/concurrent_cache.h +++ b/src/tools/concurrent_cache.h @@ -95,6 +95,113 @@ protected: // data std::mutex lock_; }; + +/** + WeakStore represent a thread safe store (map) of weak ptr. + It allows to store weak_ptr from shared_ptr and retrieve shared_ptr from + potential non expired weak_ptr. + It is not limited in size. +*/ +template +class WeakStore { + private: // types + typedef std::weak_ptr WeakValue; + + public: + explicit WeakStore() = default; + + std::shared_ptr get(const Key& key) + { + std::lock_guard l(m_lock); + auto it = m_weakMap.find(key); + if (it != m_weakMap.end()) { + auto shared = it->second.lock(); + if (shared) { + return shared; + } else { + m_weakMap.erase(it); + } + } + throw std::runtime_error("No weak ptr"); + } + + void add(const Key& key, std::shared_ptr shared) + { + std::lock_guard l(m_lock); + m_weakMap[key] = WeakValue(shared); + } + + + private: //data + std::map m_weakMap; + std::mutex m_lock; +}; + +template +class ConcurrentCache> +{ +private: // types + typedef std::shared_ptr Value; + typedef std::shared_future ValuePlaceholder; + typedef lru_cache Impl; + +public: // types + explicit ConcurrentCache(size_t maxEntries) + : impl_(maxEntries) + {} + + // Gets the entry corresponding to the given key. If the entry is not in the + // cache, it is obtained by calling f() (without any arguments) and the + // result is put into the cache. + // + // The cache as a whole is locked only for the duration of accessing + // the respective slot. If, in the case of the a cache miss, the generation + // of the missing element takes a long time, only attempts to access that + // element will block - the rest of the cache remains open to concurrent + // access. + template + Value getOrPut(const Key& key, F f) + { + std::promise valuePromise; + std::unique_lock l(lock_); + const auto x = impl_.getOrPut(key, valuePromise.get_future().share()); + l.unlock(); + if ( x.miss() ) { + // Try to get back the shared_ptr from the weak_ptr first. + try { + valuePromise.set_value(m_weakStore.get(key)); + } catch(const std::runtime_error& e) { + try { + const auto value = f(); + valuePromise.set_value(value); + m_weakStore.add(key, value); + } catch (std::exception& e) { + drop(key); + throw; + } + } + } + + return x.value().get(); + } + + bool drop(const Key& key) + { + std::unique_lock l(lock_); + return impl_.drop(key); + } + + size_t setMaxSize(size_t new_size) { + std::unique_lock l(lock_); + return impl_.setMaxSize(new_size); + } + +protected: // data + std::mutex lock_; + Impl impl_; + WeakStore m_weakStore; +}; + } // namespace kiwix #endif // ZIM_CONCURRENT_CACHE_H diff --git a/test/lrucache.cpp b/test/lrucache.cpp index 4e4f7fe35..ef3aa8789 100644 --- a/test/lrucache.cpp +++ b/test/lrucache.cpp @@ -107,3 +107,24 @@ TEST(ConcurrentCacheTest, handleException) { EXPECT_EQ(val, 888); } +TEST(ConcurrentCacheTest, weakPtr) { + kiwix::ConcurrentCache> cache(1); + auto refValue = cache.getOrPut(7, []() { return std::make_shared(777); }); + EXPECT_EQ(*refValue, 777); + EXPECT_EQ(refValue.use_count(), 2); + + // This will drop shared(777) from the cache + cache.getOrPut(8, []() { return std::make_shared(888); }); + EXPECT_EQ(refValue.use_count(), 1); + + // We must get the shared value from the weakPtr we have + EXPECT_NO_THROW(cache.getOrPut(7, []() { throw std::runtime_error("oups"); return nullptr; })); + EXPECT_EQ(refValue.use_count(), 2); + + // Drop all ref + cache.getOrPut(8, []() { return std::make_shared(888); }); + refValue.reset(); + + // Be sure we call the construction function + EXPECT_THROW(cache.getOrPut(7, []() { throw std::runtime_error("oups"); return nullptr; }), std::runtime_error); +}