پیاده‌سازی یک پشتهٔ Lock-Free - قسمت دوم: اشاره‌گرهای خطری

در این پست، سعی کردیم مدیریت حافظهٔ ساختمان‌داده‌ای که نوشتیم رو با استفاده از Hazard Pointer ها انجام بدیم.

پیاده‌سازی یک پشتهٔ Lock-Free - قسمت دوم: اشاره‌گرهای خطری

در پستِ قبلی راجع به پیاده‌سازی کردن یک پشته به شکل Lock-free صحبت کردیم و یک Garbage collector ساده هم براش نوشتیم که صرفا یک لیست انتظار از گره‌ها رو نگهداری می‌کرد و دیدیم که چنین پیاده‌سازی‌ای در سیستم‌هایی با لود بالا جواب نخواهد داد و درنهایت بازهم نشت حافظه خواهیم داشت. توی این پست راجع به یک روش دیگه به اسم «اشاره‌گرهای خطری» :) یا Hazard Pointers صحبت می‌کنیم. اشاره‌گرهای خطری اسم تکنیکی هست که آقای Maged M. Michael برای مدیریت حافظه در ساختمان‌داده‌های Lock-free در سال ۲۰۰۲ اختراع کرد. ایشون زمانی که این تکنیک رو اختراع کرد در IBM کار می‌کرد و هم‌اکنون در شرکت فیسبوک کار می‌کنه و سال ۲۰۲۲ جایزه Dijkstra رو بخاطر اختراع این تکنیک برنده شد(آدم خفنیه انصافا...).

دلیل اینکه چنین اسمی بهش میدیم این هست که آزاد کردن حافظهٔ یک گره در حالی که یک تردی هنوز بهش رفرنس داره، خطرناکه. چون اگر به هر دلیلی اون ترد اقدام به دسترسی به اون حافظه بکنه، دچار UB می‌شیم.

ایدهٔ کلی این هست که اگر یک ترد قرار هست به یک Resource/Object/... ای دسترسی پیدا بکنه که احتمال دسترسی بهش توسط تردهای دیگه وجود داره، hazard pointer خودش رو روی اون آبجکت تنظیم می‌کنه. بنابراین اگر یک تردی بخواد حافظهٔ مربوط به آبجکت مورد نظر رو آزاد کنه، با چک کردن hazard pointer بقیهٔ تردها میتونه متوجه بشه که آیا این آبجکت خاص توسط بقیهٔ تردها درحال استفاده هست یا نه. یعنی چک کنه آیا hazard pointerای داریم که به این آبجکت اشاره بکنه یا نه. اگر هیچ hazard pointerای وجود نداشت که اینکارو بکنه، میتونه با خیال آسوده حافظهٔ مورد نظرش رو آزاد کنه در غیر این صورت اون آبجکت رو به لیست انتظار اضافه می‌کنه تا بعدا دوباره سر فرصت بررسیش کنه.

اولین چیزی که نیاز داریم این هست که به ازای هر تردی که قرار هست عملیات pop رو انجام بده، یک اشاره‌گر خطری داشته باشیم به‌طوری که این اشاره‌گر برای همهٔ تردهای دیگه قابل دسترسی و خوندن باشه. از اونجایی که پروسهٔ ساختن چنین اشاره‌گری ممکنه نکات خودش رو داشته باشه، فعلا فرض می‌کنیم که ما یک تابع به اسم ()getHazardPointerForCurrentThread داریم که اینکار رو برامون انجام میده و hazard pointer تردی که قرار هست pop بکنه رو بهمون برمیگردونه تا ازش استفاده کنیم. بنابراین تنها کاری که باید بکنیم اینه که آدرس nodeای که میخوایم ازش استفاده کنیم رو در hazard pointer ذخیره کنیم:

std::shared_ptr<T> popWithHP ()
{
    std::atomic<void*>& hp = getHazardPointerForCurrentThread();
    Node* node_to_retrieve = _head.load();  // Checkpoint 1
    Node* temp;
    do
    {
        temp = node_to_retrieve;
        hp.store(node_to_retrieve);         // Checkpoint 2
        node_to_retrieve = _head.load();
    } while (temp != node_to_retrieve);     // Checkpoint 3
}

برای اینکه مطمئن بشیم که در بین اجرا شدن Checkpoint 1 تا مقداردهی hp در checkpoint 2، مقدار head_ عوض نشده(مثلا ممکنه به طور همزمان یک ترد دیگه تابع pop رو اجرا کنه و گره اول رو برداره. به این ترتیب مقدار head_ عوض میشه)، عملیات مقداردهی رو در یک حلقه do while میذاریم.

حالا که Hazard Pointer خودمون رو مقداردهی کردیم می‌تونیم ادامهٔ عملیات pop()‎ رو انجام بدیم:

    std::shared_ptr<T> popWithHP ()
    {
        std::atomic<void*>& hp = getHazardPointerForCurrentThread();
        Node* node_to_retrieve = _head.load();  // Checkpoint 1
        Node* temp;
        do
        {
            // Loop until we’ve set the hazard pointer to head.
            do
            {
                temp = node_to_retrieve;
                hp.store(node_to_retrieve);         // Checkpoint 2
                node_to_retrieve = _head.load();
            } while (temp != node_to_retrieve);     // Checkpoint 3

        } while (_head.compare_exchange_strong(node_to_retrieve, node_to_retrieve->next_node));

        // We don't need to keep hp points to the node_to_retrieve anymore as no other thread can gain access to it from now
        hp.store(nullptr);

        std::shared_ptr<T> result;
        if (node_to_retrieve)
        {
            result.swap(node_to_retrieve->data);
            // Check if this node has any other hp pointing at it.
            if (hasOwner(node_to_retrieve))
            {
                // reclaim later
            }
            else
            {
                delete node_to_retrieve;
            }

            // here we delete waiting nodes with no hazard pointers
        }

        return result;
    }

چندتا نکته راجع به این کد وجود داره که بهتره باهم بررسیش کنیم:

  • حلقه‌ای که اشاره‌گر خطری رو مقداردهی می‌کرد رو داخل حلقه‌ای گذاشتیم که عملیات compare exchange رو انجام میده. دلیل اینکار این هست که در صورت fail شدن عملیات compare exchange و reload شدن مقدار node_to_retreive، باید دوباره Hazard Pointer خودمون رو مقداردهی بکنیم. با اینکار می‌تونیم مطمئن بشیم که اشاره‌گر node_to_retrieve به درستی توسط اشاره‌گر خطری محافظت میشه و برای dereference کردنش مشکلی نیست.
  • بعد از اینکه عملیات compare exchange انجام شد، عملا ما(= تردِ فعلی که تابع pop رو صدا زده) صاحب اون گره هستیم و ترد دیگه‌ای نمیتونه بعد از ما بهش دسترسی بگیره. بنابراین میتونیم اشاره‌گر خطری خودمون رو بازنشانی(reset) کنیم.
  • در مرحلهٔ بعد برای آزاد کردن حافظهٔ گره‌ای که گرفتیم، چک می‌کنیم که «آیا hazard pointerای وجود داره که به این گره اشاره بکنه؟». اگر جواب بله بود، گره رو در یک لیست ذخیره می‌کنیم تا دوباره بعدا بررسیش کنیم و اگر جواب خیر بود، خیلی عالیه و می‌تونیم همون لحظه حافظه‌ش رو آزاد کنیم.
  • و در آخر لیست گره‌هایی که قبلا ذخیره کردیم و چک می‌کنیم تا ببینیم آیا اونها رو میشه حذف کرد یا نه.

حالا زمان این هست که بریم پیاده‌سازی توابعی که جاشون رو خالی گذاشتیم رو ببینیم. توابعی که برای کار با اشاره‌گرهای خطری نیازشون خواهیم داشت.

پیاده‌سازیِ ما به این شکل خواهد بود:

namespace HazardPointerManager
{

template <typename T>
struct HazardPointer
{
    std::atomic<std::thread::id> id;
    std::atomic<T*> pointer;
};


template <typename T>
class HpOwner
{
private:
    
    constexpr static inline size_t _MAX_HAZARD_POINTERS = 100;
    
    HazardPointer<T>* _hazard_pointer;
    static std::array<HazardPointer<T>, _MAX_HAZARD_POINTERS> _hazard_pointers_array;


public:
    HpOwner()
        : _hazard_pointer{nullptr}
    {
        for (HazardPointer<T>& hp : _hazard_pointers_array)
        {
            std::thread::id null_id;
            if (hp.id.compare_exchange_strong(null_id, std::this_thread::get_id()))
            {
                _hazard_pointer = &hp;
                break;
            }
        }

        if (!_hazard_pointer)
        {
            throw std::runtime_error("No HazardPointer available!");
        }
    }

    ~HpOwner()
    {
        _hazard_pointer->pointer.store(nullptr);
        _hazard_pointer->id.store(std::thread::id());
    }

    std::atomic<T*>& getPointer ()
    {
        return _hazard_pointer->pointer;
    }

    static bool hasOwner (T* pointer)
    {
        for (auto& hp : _hazard_pointers_array)
        {
            if (hp.pointer.load() == pointer)
            {
                return true;
            }
        }

        return false;
    }

};

template <typename T>
std::array<HazardPointer<T>, HpOwner<T>::_MAX_HAZARD_POINTERS> HpOwner<T>::_hazard_pointers_array;


/**
 * A list of pop()-ed nodes from stack that we could not free immediately.
 * So we store them in this list and will later iterate over them to check
 * if any Hazard Pointer still refers to them or not. 
*/
template <typename T>
class ReclaimList
{

private:

    struct ListNode
    {
        T* data;
        ListNode* next;

        ListNode (T* data)
            : data{data}, next{nullptr}
        {

        }

        ~ListNode ()
        {
            delete data;
        }
    };

    std::atomic<ListNode*> _reclaim_list;

public:

    ReclaimList () = default;
    ~ReclaimList () = default;

    ReclaimList (const ReclaimList&) = delete;
    ReclaimList (ReclaimList&&) = delete;
    ReclaimList& operator=(const ReclaimList&) = delete;
    ReclaimList& operator=(ReclaimList&&) = delete;

    void addToList (T* pointer)
    {
        ListNode* node_to_add = new ListNode(pointer);
        addToList(node_to_add);
    }

    void addToList (ListNode* node_to_add)
    {
        node_to_add->next = _reclaim_list.load();
        while (!_reclaim_list.compare_exchange_weak(node_to_add->next, node_to_add))
        {

        }
    }

    void freeOrphanNodes ()
    {
        // Claim the list so no other threads can have further access to it
        ListNode* current_node = _reclaim_list.exchange(nullptr);

        while (current_node)
        {
            ListNode* next_node = current_node->next;

            // Check if the node has any Hazard Pointer refering to it. If any, add it back to the list.
            if (!HpOwner<T>::hasOwner(current_node->data))
            {
                delete current_node;
            }
            else
            {
                addToList(current_node);
            }

            current_node = next_node;
        }
    }
};

} // namespace HazardPointerManager

سعی کردم توی کد به اندازهٔ کافی کامنت بذارم که قابل فهم‌تر بشه. ولی بهرحال سعی می‌کنم اینجا هم به صورت خلاصه توضیحش بدم:

همونطور که بالاتر گفتیم، قرار هست هر تردی که عملیات pop رو انجام میده یک Hazard Pointer برای خودش داشته باشه و به گره‌ای که میخواد حذفش کنه اشاره کنه تا یک ترد دیگه قبل از آزاد کردن حافظهٔ اون گره، بره و Hazard Pointer بقیهٔ تردها رو بررسی کنه و ببینه آیا اشاره‌گری وجود داره که به گره موردنظر اشاره کنه یا نه. اگه اشاره‌گری نبود، حافظهٔ گره رو آزاد می‌کنه.

بنابراین ما یک لیست از اشاره‌گرهای خطری باید داشته باشیم که همهٔ تردها بتونن بهش دسترسی داشته باشن و بخوننش. راه‌حلی که اینجا پیش گرفتیم این هست که یک لیست با تعداد عضو ثابت(در اینجا ۱۰۰) داشته باشیم و همهٔ تردها با پیمایش این لیست/آرایه/... بتونن اشاره‌گر خطریِ بقیهٔ تردها رو ببینن. اسم لیست ما در این کد ‎_hazard_pointers_array هست.

اعضای این لیست، یک struct به نام HazardPointer هستن که این استراکت خودش دو عضو داره: شناسهٔ ترد و خودِ hazard pointer.

عملیات تخصیص یک Hazard Pointer به یک ترد، در تابع سازندهٔ کلاسِ HpOwner انجام میگیره: هربار موقع ساخته شدن یک شئ از این کلاس، ابتدا لیستِ اشاره‌گرهای خطری پیمایش میشه و اگر جای خالی‌ای وجود داشت، شناسه(id) تردی که داره شئِ HpOwner رو میسازه به شئ HazardPointer موجود در لیستمون تخصیص داده میشه.

اگر هیچ جای خالی‌ای وجود نداشته باشه، برنامه یک استثنا پرتاب می‌کنه.

به‌نظرم کد کلاس ReclaimList خیلی نیاز به توضیح نداره چرا که یک لیست پیوندیِ ساده هست و مشابه‌ش رو قبلا دیدیم.

اما چطور از اینها در تابع pop()‎ خودمون استفاده می‌کنیم؟ به شکل زیر:

template <typename T>
class LockFreeStack
{

private:

    struct Node
    {
        std::shared_ptr<T> data;
        Node* next_node;

        Node (const T& arg_data)
            : data(std::make_shared<T>(arg_data))
        {

        }
    };

    std::atomic<Node*> _head;
    HazardPointerManager::ReclaimList<Node> _hp_reclaim_list;


    /**
     * Create a object per thread so after first time there is no need to go through object creation.
    */
    std::atomic<Node*>& getHazardPointerForCurrentThread ()
    {
        static thread_local HazardPointerManager::HpOwner<Node> hp_owner;
        return hp_owner.getPointer();
    }


    bool hasOwner (Node* node)
    {
        return HazardPointerManager::HpOwner<Node>::hasOwner(node);
    }

public:

    LockFreeStack ()
        : _head{nullptr}, _nodes_to_delete_list{nullptr}, _threads_in_pop{0}
    {
    }

    ~LockFreeStack ()
    {
        Node* head = _head.load();

        while (head)
        {
            Node* next = head->next_node;
            delete head;
            head = next;
        }

        // Remove any remaining nodes in reclaim list
        _hp_reclaim_list.freeOrphanNodes();
    }

    void push (const T& data)
    {
        Node* new_node = new Node(data);
        new_node->next_node = _head.load();

        while (!_head.compare_exchange_weak(new_node->next_node, new_node))
        {

        }
    }

    std::shared_ptr<T> popWithHP ()
    {
        std::atomic<Node*>& hp = getHazardPointerForCurrentThread();
        Node* node_to_retrieve = _head.load();  // Checkpoint 1
        Node* temp;
        do
        {
            // Loop until we’ve set the hazard pointer to head.
            do
            {
                temp = node_to_retrieve;
                hp.store(node_to_retrieve);         // Checkpoint 2
                node_to_retrieve = _head.load();
            } while (temp != node_to_retrieve);     // Checkpoint 3

        } while (node_to_retrieve && !_head.compare_exchange_strong(node_to_retrieve, node_to_retrieve->next_node));

        // We don't need to keep hp points to the node_to_retrieve anymore as no other thread can gain access to it from now
        hp.store(nullptr);

        std::shared_ptr<T> result;
        if (node_to_retrieve)
        {
            result.swap(node_to_retrieve->data);
            // Check if this node has any other hp pointing at it.
            if (hasOwner(node_to_retrieve))
            {
                // reclaim later
                _hp_reclaim_list.addToList(node_to_retrieve);
            }
            else
            {
                // No Hazard Pointer refers to this node. Safe to delete immediatly.
                delete node_to_retrieve;
            }

            // here we delete waiting nodes with no hazard pointers
            _hp_reclaim_list.freeOrphanNodes();
        }

        return result;
    }
};

این کد هم بخش زیادیش همون چیزی هست که در ابتدای این پست دیدیم و تفاوتش در این هست که حالا پیاده‌سازیِ توابع getHazardPointerForCurrentThread()‎ و hasOwner()‎ رو داریم. به‌نظرم تنها بخشی که نیاز به توضیح داره، اولین تابع هست.

همونطور که توی کد مربوط به پیاده‌سازیِ HazardPointerManager دیدیم، یک hazard pointer در زمان ساخته شدن شئ HpOwner به یک ترد تخصیص داده میشه. پس برای اینکه بتونیم برای تردمون یک hazard pointer داشته باشیم، باید یک نمونه از HpOwner بسازیم.

کاری که در تابع  getHazardPointerForCurrentThread()‎ انجام دادیم این هست که اومدیم یک شئ رو به صورت static و thread_local تعریف کردیم :)

این‌ها باعث میشن که ما به ازای هر تردی که pop رو فراخوانی می‌کنه، تنها یکبار یک شئ HpOwner بسازیم و در دفعات بعد فقط کافیه از همون شئ استفاده کنیم و دیگه نیاز نیست دوباره شئ ساخته بشه یا جستجویی توی لیست انجام بگیره(البته نکاتی هم در استفاده از static از لحاظ پرفورمنس وجود داره که اینجا بهش نمی‌پردازم...).

در نهایت هنوز هم این پیاده‌سازی‌ای که دیدیم، کامل و قابل استفاده از محصول واقعی نیست. درسته که به شکل امن می‌تونه حافظهٔ گره‌های حذف شده از پشته رو آزاد بکنه اما مقدار زیادی هم سربار به سیستم تحمیل می‌کنه. هربار که می‌خوایم ببینیم «آیا Hazard Pointer ای وجود داره که به گره‌مون اشاره کنه؟»، باید یک لیست به اندازهٔ ‎_MAX_HAZARD_POINTERS رو پیمایش کنیم و این کار رو به ازای هربار صدا زدن تابع pop()‎ داریم انجام می‌دیم! و زمانی که می‌خوایم گره‌های یتیم(LOL) رو پیدا کنیم باید این کار رو به ازای همهٔ گره‌های موجود توی لیست انتظارمون انجام بدیم.

همهٔ اینها رو بذارید در کنار این مسئله که عملیات‌های اتمیک به صورت ذاتی حدود ۱۰۰ برابر کندتر از عملیات‌های عادی هستن. پس اونقدر هم که فکر می‌کردیم راه‌حل مناسبی نیست... اما بهرحال میتونیم تا حدی پرفورمنس رو بهتر بکنیم.

بهبود پرفورمنس Reclamation

یکی از راه‌هایی که میشه باهاش پرفورمنس این پیاده‌سازی رو بهتر کرد این هست که بیایم حافظه رو فدای پردازنده کنیم. یعنی ترجیح بدیم تعداد گره بیشتری منتظر بمونن اما مطمئن باشیم که سیکل‌های پردازنده هدر نمیرن: بجای اینکه بیایم به ازای هربار pop کردن چک کنیم آیا گره‌ای توی لیست انتظار هست یا نه، تنها زمانی لیست انتظار رو چک می‌کنیم که حداقل به تعداد ‎_MAX_HAZARD_POINTERS گره داخل لیست وجود داشته باشه. اینطوری میتونیم مطمئن باشیم که حداقل یک گرهٔ یتیم وجود داره که بخوایم حافظه‌ش رو آزاد کنیم.

پس کاری که انجام میدیم چیه؟ تنها زمانی شروع به چک کردن لیست انتظار می‌کنیم که اندازهٔ این لیست به تعداد ۲ برابر ‎_MAX_HAZARD_POINTERS باشه. چرا ۲ برابر؟ چون اینجوری مطمئن هستیم که حداقل تعداد ‎_MAX_HAZARD_POINTERS گره وجود داره که یتیم هستن. درواقع اینطور باید بهش نگاه کرد: تعداد hazard pointerهای ما، ‎_MAX_HAZARD_POINTERS هست. پس اگر لیستی داشته باشیم که اندازه‌ش ۲ برابر ‎_MAX_HAZARD_POINTERS باشه، در بدترین حالت فقط تعداد ‎_MAX_HAZARD_POINTERS گره توسط hazard pointerها دارن استفاده میشن. پس بقیهٔ گره‌های موجود در لیست، یتیم هستن.

اما این راه هم اونقدر مناسب نیست... برای نگه‌داشتن تعداد گرهٔ داخل لیست انتظار باید از یک متغییر جهت شمارش استفاده کنیم که اون هم باید اتمیک باشه. و همهٔ تردهای pop برای دسترسی به اون شمارنده و در نهایت لیست انتظار، باهمدیگه سر و کله می‌زنن.

می‌تونیم باز هم بیشتر حافظه رو فدای پردازنده بکنیم و به ازای هر تردِ pop، یک لیست انتظار داشته باشیم. یعنی یک لیست thread_local برای هر ترد. حالا دیگه شمارش و دسترسی به لیست برای تردها مشکلی نیست چون تمام دسترسی‌ها محلی خواهد بود. اما بجای ۲ برابر ‎_MAX_HAZARD_POINTERS، اینبار ما تعداد ‎_MAX_HAZARD_POINTERS * ‎_MAX_HAZARD_POINTERS رو داریم ذخیره می‌کنیم(تعداد تردهامون ‎_MAX_HAZARD_POINTERS هست و هر ترد هم حداقل یک لیست به اندازهٔ ‎_MAX_HAZARD_POINTERS خواهد داشت).

یک راه دیگه هم این هست که همون لیستِ global خودمون رو داشته باشیم اما تردهای pop اقدام به آزاد کردن حافظهٔ گره‌های اون لیست نکنن. فقط گرهٔ موردنظر خودشون رو اضافه کنن و برن پِی کار خودشون. بجاش، یک تردِ مخصوص چک کردن این لیست ایجاد می‌کنیم که به صورت دوره‌ای بیاد اعضای لیست رو چک کنه. از اونجایی که پیاده‌سازیش به صورت single-consumer میشه، همونطور که قبلا دیدیم هم پیچیدگی کمتری خواهد داشت و هم پرفورمنس بهتری رو داره.

جمع‌بندی

روش Hazard Pointer، ایدهٔ ساده‌ای داره اما وقتی وارد جزئیات پیاده‌سازیش در فضای concurrency میشیم، سخت میشه(مثل مابقی چیزها :)). جالبه که بدونیم سال ۲۰۱۸ یک پروپوزال برای نسخه‌های آیندهٔ سی++ ایجاد شد که ساختمان‌داده‌های concurrent رو با استفاده از Hazard Pointer و Read-Copy-Update داخل کتابخانه‌های استاندارد سی++ داشته باشیم :) البته طبق چیزی که متوجه شدم، از اونموقع هنوز بررسیش نکردن و تکلیفش مشخص نیست. اما اگر بجای تمرکز روی چیزهای کمتر مفید مثل Generic Programming و اضافه کردن پارادایم Functional، برای قابلیت‌های مفید هم وقت بذارن میشه امیدوار بود که این ساختمان‌داده‌ها رو در آینده داشته باشیم. شایدم نه.

عزت زیاد.