Add a template specialization for ConcurrentCache storing shared_ptr

When ConcurrentCache store a shared_ptr we may have shared_ptr in used
while the ConcurrentCache has drop it.
When we "recreate" a value to put in the cache, we don't want to recreate
it, but copying the shared_ptr in use.

To do so we use a (unlimited) store of weak_ptr (aka `WeakStore`)
Every created shared_ptr added to the cache has a weak_ptr ref also stored
in the WeakStore, and we check the WeakStore before creating the value.
This commit is contained in:
Matthieu Gautier 2022-05-24 16:22:06 +02:00
parent 2b38d2cf1b
commit e5ea210d2c
2 changed files with 128 additions and 0 deletions

View File

@ -95,6 +95,113 @@ protected: // data
std::mutex lock_; std::mutex lock_;
}; };
/**
WeakStore represent a thread safe store (map) of weak ptr.
It allows to store weak_ptr from shared_ptr and retrieve shared_ptr from
potential non expired weak_ptr.
It is not limited in size.
*/
template<typename Key, typename Value>
class WeakStore {
private: // types
typedef std::weak_ptr<Value> WeakValue;
public:
explicit WeakStore() = default;
std::shared_ptr<Value> get(const Key& key)
{
std::lock_guard<std::mutex> l(m_lock);
auto it = m_weakMap.find(key);
if (it != m_weakMap.end()) {
auto shared = it->second.lock();
if (shared) {
return shared;
} else {
m_weakMap.erase(it);
}
}
throw std::runtime_error("No weak ptr");
}
void add(const Key& key, std::shared_ptr<Value> shared)
{
std::lock_guard<std::mutex> l(m_lock);
m_weakMap[key] = WeakValue(shared);
}
private: //data
std::map<Key, WeakValue> m_weakMap;
std::mutex m_lock;
};
template <typename Key, typename RawValue>
class ConcurrentCache<Key, std::shared_ptr<RawValue>>
{
private: // types
typedef std::shared_ptr<RawValue> Value;
typedef std::shared_future<Value> ValuePlaceholder;
typedef lru_cache<Key, ValuePlaceholder> Impl;
public: // types
explicit ConcurrentCache(size_t maxEntries)
: impl_(maxEntries)
{}
// Gets the entry corresponding to the given key. If the entry is not in the
// cache, it is obtained by calling f() (without any arguments) and the
// result is put into the cache.
//
// The cache as a whole is locked only for the duration of accessing
// the respective slot. If, in the case of the a cache miss, the generation
// of the missing element takes a long time, only attempts to access that
// element will block - the rest of the cache remains open to concurrent
// access.
template<class F>
Value getOrPut(const Key& key, F f)
{
std::promise<Value> valuePromise;
std::unique_lock<std::mutex> l(lock_);
const auto x = impl_.getOrPut(key, valuePromise.get_future().share());
l.unlock();
if ( x.miss() ) {
// Try to get back the shared_ptr from the weak_ptr first.
try {
valuePromise.set_value(m_weakStore.get(key));
} catch(const std::runtime_error& e) {
try {
const auto value = f();
valuePromise.set_value(value);
m_weakStore.add(key, value);
} catch (std::exception& e) {
drop(key);
throw;
}
}
}
return x.value().get();
}
bool drop(const Key& key)
{
std::unique_lock<std::mutex> l(lock_);
return impl_.drop(key);
}
size_t setMaxSize(size_t new_size) {
std::unique_lock<std::mutex> l(lock_);
return impl_.setMaxSize(new_size);
}
protected: // data
std::mutex lock_;
Impl impl_;
WeakStore<Key, RawValue> m_weakStore;
};
} // namespace kiwix } // namespace kiwix
#endif // ZIM_CONCURRENT_CACHE_H #endif // ZIM_CONCURRENT_CACHE_H

View File

@ -107,3 +107,24 @@ TEST(ConcurrentCacheTest, handleException) {
EXPECT_EQ(val, 888); EXPECT_EQ(val, 888);
} }
TEST(ConcurrentCacheTest, weakPtr) {
kiwix::ConcurrentCache<int, std::shared_ptr<int>> cache(1);
auto refValue = cache.getOrPut(7, []() { return std::make_shared<int>(777); });
EXPECT_EQ(*refValue, 777);
EXPECT_EQ(refValue.use_count(), 2);
// This will drop shared(777) from the cache
cache.getOrPut(8, []() { return std::make_shared<int>(888); });
EXPECT_EQ(refValue.use_count(), 1);
// We must get the shared value from the weakPtr we have
EXPECT_NO_THROW(cache.getOrPut(7, []() { throw std::runtime_error("oups"); return nullptr; }));
EXPECT_EQ(refValue.use_count(), 2);
// Drop all ref
cache.getOrPut(8, []() { return std::make_shared<int>(888); });
refValue.reset();
// Be sure we call the construction function
EXPECT_THROW(cache.getOrPut(7, []() { throw std::runtime_error("oups"); return nullptr; }), std::runtime_error);
}