پیاده‌سازی یک پشتهٔ Lock-Free - قسمت اول

در این پست(و احتمالا پست بعدی) راجع به پیاده‌سازی کردن یک Stack به شکل lock-free صحبت می‌کنیم.

پیاده‌سازی یک پشتهٔ Lock-Free - قسمت اول

سلام. در این پست(و احتمالا پست بعدی) راجع به پیاده‌سازی یک Stack به شکل lock-free صحبت می‌کنیم.

همونطوره توی پست قبلی دیدیم، ساختمان‌داده‌های lock-free بر اساس متغییرها و عملیات‌های اتمیک و memory orderهای مربوط به اونها پیاده‌سازی میشن. اولش ما از memory_order_seq_cst استفاده می‌کنیم تا راحت‌تر بتونیم کلیّات ساختمان‌داده رو پیاده‌سازی بکنیم. وقتی به مرحله‌ای رسیدیم که یک ساختمان‌دادهٔ lock-free داشتیم که به‌درستی کار می‌کرد میتونیم محدودیت‌های memory order رو کمتر بکنیم و از چیزهایی مثل memory_order_acquire و memory_order_release یا حتی memory_order_relaxed استفاده کنیم.

بسته به نیازمندی‌هامون و پلتفرمی که می‌خوایم کد رو روش اجرا بکنیم باید تصمیم بگیریم که چه نوع پیاده‌سازی‌ای داشته باشیم. بعضی وقت‌ها شاید بهتر باشه که صرفا یک پیاده‌سازیِ Lock based داشته باشیم. بنابراین همیشه باید گزینه‌های مختلفی که جلومون هست رو ‌به دقت Profile کنیم و در نهایت بهترین گزینه رو انتخاب کنیم. ساده‌ترین ساختمان‌داده‌ای که میشه به صورت Lock-Free پیاده‌سازی کرد، پشته یا استک (Stack) هست.

پیاده‌سازی push

همونطور که می‌دونیم، استک به شکل LIFO یا Last In First Out کار میکنه. بنابراین چیزی که اینجا برای ما مهم هست اینه که مطمئن بشیم زمانی که یک داده توسط یک ترد وارد استک ما میشه، بتونه درجا توسط یک ترد دیگه از استک هم خارج بشه و مشکلی هم پیش نیاد. نکتهٔ دیگه‌ای که اینجا اهمیت داره اینه که باید مطمئن بشیم دوتا ترد همزمان یک مقدار رو از استک نمی‌خونن و حتما المان‌های متفاوتی برمیگردونن.

ساده‌ترین پیاده‌سازی‌ای که برای استک میتونیم داشته باشیم یک لیست پیوندی هست که اشاره‌گر head به اولین گرهٔ ما(یعنی آخرین المان‌ای که وارد استک شده) اشاره‌ می‌کنه و هر گره هم به گرهٔ بعد از خودش اشاره می‌کنه. به این ترتیب، اضافه کردن یک المان به استک ما شامل مراحل زیر هست:

  1. ساختن یک گره جدید
  2. مقداردهی اشاره‌گر next گره جدید به head
  3. مقداردهی head به گره جدید

حالا مشکلی که میتونه توی یک کد Concurrent پیش بیاد چیه؟ اینکه ممکنه دوتا ترد همزمان بخوان دادهٔ خودشون رو وارد استک بکنن و اونوقت توی قدم شمارهٔ ۲ و ۳ ما Race Condition خواهیم داشت چرا که ممکنه زمانی که یک ترد قدم دوم رو برداشته، یک ترد دیگه مقدار head‍ رو عوض کنه و به این ترتیب کمترین مشکلی که میتونه پیش بیاد اینه که یکی از داده‌هامون رو از دست میدیم.

کاری که میتونیم برای حل کردن این مشکل انجام بدیم این هست که از یک عملیات compare-exchange توی قدم سوم استفاده بکنیم و به این ترتیب مطمئن بشیم که مقدار head در حال حاضر همونی هست که توی مرحلهٔ دوم خوندیم. اگر مقدار head عوض شده بود، دوباره برمیگردیم به قدم شمارهٔ ۲ و دوباره سعی میکنیم کار رو انجام بدیم.

کد زیر یک مثال از چنین پیاده‌سازی‌ای هست:

template <typename T>
class LockFreeStack
{

private:
    struct Node
    {
        T data;
        Node* next_node;

        Node (const T& data)
            : data(data)
        {

        }
    };

    std::atomic<Node*> _head;

public:

    void push (const T& data)
    {
        Node* new_node = new Node(data);
        new_node->next_node = _head.load(); // Checkpoint 1

        while (!_head.compare_exchange_weak(new_node->next_node, new_node)) // Checkpoint 2
        {

        }
    }

};

همونطور که دیدیم، صرفا با استفاده از یک عملیات compare-exchange این مشکل رو حل کردیم و حالا در Checkpoint 3 مطمئن میشیم که مقدارِ head_، همونی هست که در Checkpoint 2 خوندیم. اما بالاتر گفتیم که اگر مقدار head_ تغییر کرد باید دوباره مراحل رو انجام بدیم. این چی؟ این مورد هم با استفاده از همین عملیات compare-exchange عزیز انجام شده چراکه این عملیات به شکلی هست که اگر مقدار آرگومان اول(در اینجا new_node->next_node) برابر با مقدار متغییر اتمیکِ مورد نظرمون(head_) نباشه، خودش مقدارِ فعلی new_node->next_node رو با head_ جایگزین می‌کنه و از اونجایی که همهٔ اینها توی یک حلقه هست و داره تکرار میشه، دقیقا همونکاری که ما میخوایم رو داره انجام میده.

نکتهٔ بعدی این هست که چون ما اینجا داریم عملیات compare-exchange رو در یک حلقه انقدر تکرار می‌کنیم تا به درستی انجام بشه، نیازی نیست که از compare_exchange_strong استفاده کنیم و صرفا استفاده از compare_exchange_weak کفایت می‌کنه و می‌تونه احتمالا پرفورمنس بهتری رو برامون فراهم کنه.

پیاده‌سازی pop

حالا که تردهامون میتونن داده به استک اضافه بکنن، زمان این هست که قابلیت برداشتن داده از استک رو هم پیاده‌سازی کنیم. مراحل برداشتن یک داده از استک به این صورت خواهد بود:

  1. خوندن مقدار فعلی head
  2. خوندن مقدار head->next
  3. مقداردهی head به head->next
  4. برگردوندن data از گره مورد نظر
  5. آزاد کردن حافظهٔ گره‌ مورد نظر

اگر دوتا ترد همزمان بخوان این عملیات رو انجام بدن، بدیهی‌ترین جایی که ممکنه Race Condition پیش بیاد باز هم مربوط میشه به تغییر کردن مقدار head. ممکنه دو ترد همزمان قدم اول رو بردارن و یک مقدار از head رو بخونن ولی یکی از تردها به سرعت همهٔ مراحل رو طی کنه و حافظه‌ای که head بهش اشاره می‌کرد رو پاک کنه. حالا ترد دوم که سعی میکنه از head استفاده کنه، باعث ایجاد UB خواهد شد. درواقع این موضوع، یکی از مهم‌ترین مشکلات پیاده‌سازیِ یک ساختمان دادهٔ Lock-free و یکی از سخت‌ترین‌ها برای حل هست. برای همین، فعلا قدم پنجم رو فاکتور می‌گیریم و میذاریم memory leak داشته باشیم.

مشکل بعدی‌ای که هست اینه که اگر دوتا ترد همزمان مقدار head رو بخونن، جفتشون یک مقدار رو برمیگردونن که این خلاف چیزی هست که روش توافق کردیم اما خوشبختانه حل کردنش خیلی راحت‌تر از مشکل آزادسازی حافظه‌ست. برای حل کردنش همونکاری رو که توی تابع push انجام دادیم، اینجا هم انجام میدیم و دست به دامن compare-exchange برای تغییر دادن مقدار head خواهیم شد. درواقع توی مرحلهٔ سوم با استفاده از compare-exchange چک میکنیم که اگر مقدار head فعلیِ ما با چیزی که قبلا خونده شده متفاوت بود(حالا در اثر اینکه یک ترد دیگه به شکل همزمان یک المان جدید اضافه کرده یا یک المان رو حذف کرده)، ادامه ندیم و از اول مراحل رو دوباره تکرار کنیم. در نهایت زمانی که عملیات compare-exchange با موفقیت کار خودش رو انجام داد، میتونیم مطمئن بشیم که ما تنها تردی هستیم که این مقدارِ خاص رو از استک دریافت کردیم و حالا میتونیم مقداری که دریافت کردیم رو به caller برگردونیم:

void pop (T& data)
{
    Node* node_to_retrieve = _head.load();
    while (!_head.compare_exchange_weak(node_to_retrieve, node_to_retrieve->next_node))
    {

    }
    data = node_to_retrieve->data; // Checkpoint 1
}

اما با این پیاده‌سازی هم، همچنان یک‌سری مشکلات هست. اولین چیزی که به چشممون میخوره این هست که اگر استک ما خالی باشه(یعنی head_ برابر با nullptr باشه)، اونوقت UB خواهیم داشت. مشکل بعدی، بحث Exception Safety هست. اگر زمانی که داریم سعی می‌کنیم در checkpoint 1 داده‌مون رو کپی کنیم یک استثنا رخ بده، داده‌مون رو از دست خواهیم داد.

برای حل مشکل اول میتونیم صرفا توی شرطِ مربوط به while چک کنیم که head_ معتبر باشه. برای مشکل دوم هم میتونیم دادهٔ موجود در گره رو به شکل یک اشاره‌گر هوشمند ذخیره کنیم و یک اشاره‌گر برگردونیم. بهترین انتخاب برای اینجا هم <>std::shared_ptr هست return کردنش هیچ استثنا ای برنمیگردونه. در نهایت کدمون فعلا اینطوری خواهد بود:

#include <atomic>
#include <memory>

template <typename T>
class LockFreeStack
{

private:
    struct Node
    {
        std::shared_ptr<T> data;
        Node* next_node;

        Node (const T& arg_data)
            : data(std::make_shared<T>(arg_data))
        {

        }
    };

    std::atomic<Node*> _head;

public:

    void push (const T& data)
    {
        Node* new_node = new Node(data);
        new_node->next_node = _head.load();

        while (!_head.compare_exchange_weak(new_node->next_node, new_node))
        {

        }
    }

    std::shared_ptr<T> pop (T& data)
    {
        Node* node_to_retrieve = _head.load();
        while (node_to_retrieve && !_head.compare_exchange_weak(node_to_retrieve, node_to_retrieve->next_node))
        {

        }
        return node_to_retrieve ? node_to_retrieve->data : std::shared_ptr<T>();
    }
};

خب. تقریبا دیگه کارمون برای پیاده‌سازیِ این ساختمان داده تموم شده(بجز بحث مدیریت حافظه). تنها نکته‌ای که باقی مونده و مهمه که بهش توجه داشته باشیم اینه که بدونیم این ساختمان داده اگرچه lock-free هست اما wait-free نیست و اون دوتا حلقهٔ whileای که داریم در تئوری میتونن تا همیشه ادامه داشته باشن.

بریم سراغ حل کردن مشکل نشت حافظه.

حل مشکل نشت حافظه

خب، اساسا کاری که ما می‌خوایم انجام بدیم اینه که حافظهٔ یک گره(node) رو زمانی آزاد کنیم که هیچ ترد دیگه‌ای امکان دسترسی به اون گره رو نداشته باشه. یکبار دیگه که کد رو بررسی کنیم، می‌بینیم تنها جایی که تردها به گره‌های «داخل» استک دسترسی پیدا می‌کنن، تابع ()pop هست چرا که توی ()push بعد از اضافه شدن گره به استک، دیگه بهش دست نمی‌زنیم و هرکاری که میخوایم روی اون گره انجام بدیم قبل از اضافه شدنش به استک‌مون اتفاق میوفته.

بنابراین اگر معماری برنامه‌ای که قراره از این استک استفاده کنه طوری باشه که فقط یک ترد عملیات ()pop رو انجام بده، مشکلی نخواهد بود و میشه همونجا در آخر تابع ()pop بیایم و حافظهٔ گره رو آزاد کنیم. اینجا دقیقا متوجه می‌شیم که منظور از جمله‌ای که توی پاراگراف‌های اول این پست نوشتم چیه:

بسته به نیازمندی‌هامون و پلتفرمی که می‌خوایم کد رو روش اجرا بکنیم باید تصمیم بگیریم که چه نوع پیاده‌سازی‌ای داشته باشیم.

یعنی چی؟ یعنی اگر صرفا بخوایم از این ساختمان داده به شکل Single Producer-Single Consumer یا Multiple Producer-Single Consumer استفاده کنیم، پیچیدگی کدمون خیلی کمتر و پرفورمنسمون خیلی بیشتر خواهد بود. اما اگر واقعا نیاز داشتیم که به صورت Multiple Consumer استفاده کنیم چی؟

نیاز داریم تا راهی پیدا کنیم که طی اون متوجه بشیم چه موقعی برای آزاد کردن حافظهٔ یک گره مناسبه. و راهی که داریم اینه که یک Garbage collector پیاده‌سازی کنیم😬 خوبیش اینه که فقط باید حواسمون به گره‌‌هایی باشه که از تابع ()pop بهشون دسترسی پیدا میکنیم.

راهی که کتاب پیشنهاد داده این هست که یک لیست انتظار داشته باشیم و گره‌هایی که از استک برمیداریم رو به این لیست انتظار اضافه کنیم. بعد، زمانی که هیچ تردی توی تابع ()pop نبود(پس کسی نیست که دسترسی‌ای به گره‌های داخل استک داشته باشه)، میایم و حافظهٔ گره‌هایی که توی این لیست قرار دارن رو آزاد می‌کنیم. حالا چطور بفهمیم که آیا تردی توی تابع ()pop هست یا نه؟ واقعا این مورد یکی از پیچیده‌ترین کدهای عمرتون خواهد بود. مطمئنم.

دروغ گفتم. فقط کافیه که یه شمارنده اول تابع بذاریم و هر تردی که وارد تابع ()pop میشه، یکی به اون شمارنده اضافه کنیم و درست وقتی میخواد از تابع خارج بشه، یکی از شمارنده کم میکنیم :) زمانی که این شمارندهٔ ما برابر با صفر بود یعنی هیچ تردی نیست که بخواد اذیتمون کنه و با خیال راحت می‌تونیم حافظهٔ گره‌های توی لیست انتظار رو آزاد کنیم.

در نهایت کد ما چنین چیزی خواهد بود:

#include <atomic>
#include <memory>

template <typename T>
class LockFreeStack
{

private:
    struct Node
    {
        std::shared_ptr<T> data;
        Node* next_node;

        Node (const T& arg_data)
            : data(std::make_shared<T>(arg_data))
        {

        }
    };

    std::atomic<Node*> _head;
    std::atomic<Node*> _nodes_to_delete_list;
    std::atomic<uint> _threads_in_pop; // Checkpoint 1


    void tryFree (const Node* node)
    {
        if (_threads_in_pop == 1) // Checkpoint 3
        {
            // We are the only thread in the pop()
            Node* current_delete_list = _nodes_to_delete_list.exchange(nullptr);
            if ((--_threads_in_pop) == 0) // Checkpoint 4
            {
                // We are confident that no one has access to the "to be deleted nodes" list
                freeNodeList(current_delete_list);

            }
            else if (current_delete_list)
            {
				// Checkpoint 7
                updateNodeList(current_delete_list);
            }

            delete node; // Checkpoint 5
        }
        else
        {
			// Checkpoint 6
            if (node)
            {
            	updateNodeList(node, node);
            }
			--_threads_in_pop;
        }
    }

    void freeNodeList (Node* node_list)
    {
        while (node_list)
        {
            Node* next_node = node_list->next_node;
            delete node_list;
            node_list = next_node;
        }
    }

    void updateNodeList (const Node* node_list)
    {
        Node* last_node = node_list;
        while (last_node->next_node)
        {
            last_node = last_node->next_node;
        }
        updateNodeList(node_list, last_node);
    }

    void updateNodeList (const Node* first, const Node* last)
    {
        last->next_node = _nodes_to_delete_list.load();
        while (!_nodes_to_delete_list.compare_exchange_weak(last->next_node, first))
        {

        }
    }

public:

    void push (const T& data)
    {
        Node* new_node = new Node(data);
        new_node->next_node = _head.load();

        while (!_head.compare_exchange_weak(new_node->next_node, new_node))
        {

        }
    }

    std::shared_ptr<T> pop (T& data)
    {
        ++_threads_in_pop;

        Node* node_to_retrieve = _head.load();
        while (node_to_retrieve && !_head.compare_exchange_weak(node_to_retrieve, node_to_retrieve->next_node))
        {

        }

        std::shared_ptr<T> result;
        if (node_to_retrieve)
        {
            result.swap(node_to_retrieve->data); // Checkpoint 2
        }
		tryFree(node_to_retrieve);

        return result;
    }
};

اون شمارنده‌ای که گفتیم، در Checkpoint 1 تعریف کردیم و همونطور که گفتیم کارش این هست که تعداد تردهایی که در یک لحظه در تابع ()pop هستند رو بشماره. برمیگردیم به تابع ()pop نگاه می‌کنیم. می‌بینیم که در Checkpoint 2 از متود ()swap استفاده کردیم. دلیلش چیه؟ همونطور که بالاتر گفتم، ما قرار هست یک لیست انتظار از گره‌هایی که حافظه‌شون باید آزاد بشه رو داشته باشیم و ازش استفاده کنیم. ولی ما به داده‌ای که درون گره‌ها ذخیره شده نیازی نداریم. چرا الکی حافظهٔ بیشتری مصرف کنیم؟ پس میایم و دادهٔ استخراج شده از استک رو صرفا منتقل می‌کنیم به Caller تابع ()pop و اون خودش مسئولیت مدیریت حافظهٔ داده رو به عهده می‌گیره. بعدش می‌رسیم به تابع ()tryFree که عملیات Garbage Collection ما اینجا انجام می‌گیره.

زمانی که ما وارد تابع ()tryFree می‌شیم، دو حالت بیشتر وجود نداره:

  1. ما تنها تردی هستیم که توی تابع ()pop هستیم. پس threads_in_pop_ برابر با ‍۱ هست.
  2. بجز ما ترد دیگه‌ای هم داخل تابع ()pop هست. پس threads_in_pop_ برابر با ۱ نیست.

فرض بگیریم که در حالت شمارهٔ ۱ قرار داریم:

توی این حالت ما می‌تونیم حافظهٔ اون گره‌ای که تازه از استک خارج کردیم رو آزاد کنیم. همچنین می‌تونیم تلاش کنیم و ببینیم که آیا شرایط برای آزاد کردن بقیهٔ گره‌هایی که در لیست انتظار قرار دارن فراهم هست یا نه؟

کاری که باید بکنیم این هست که لیستِ انتظاری که درحال حاضر موجود هست رو برداریم برای خودمون تا تردهای دیگه بهش دست نزنن. اینکار رو با یک عملیات atomic exchange انجام میدیم. حالا که لیست رو برداشتیم، مقدار threads_in_pop_ رو بررسی می‌کنیم تا مطمئن بشیم که هیچ تردی در حال حاضر توی ()pop نیست(پایینتر توضیح میدم که چرا دوباره اینجا بررسیش کردیم). اگر مطمئن شدیم که هیچ ترد دیگه‌ای نیست، حافظهٔ گره‌هایی که توی لیست هستند رو آزاد می‌کنیم. ولی اگر ترد دیگه‌ای وارد ()pop شده بود، لیست انتظاری که خودمون برداشته بودیم رو دوباره می‌ذاریم سر جاش(Checkpoint 7). خیلی توضیحش نمیدم که چطور اینکار رو می‌کنیم چون پیچیدگی خاصی نداره. صرفا نکته‌ش اینه که با لیست nodes_to_delete_list_ به شکل یک لیست پیوندی برخورد می‌کنیم و لیستِ فعلی‌ای که داریم رو به اولِ nodes_to_delete_list_ اضافه می‌کنیم.

در نهایت هم حافظهٔ گره‌ای که از طریق تابع ()pop دریافت کرده بودیم رو آزاد می‌کنیم.

حالا اگر فرض کنیم در حالت شمارهٔ ۲ قرار داریم چی؟ خیلی ساده‌ست. گره‌ای که از تابع ()pop دریافت کردیم رو به لیستِ انتظار اضافه می‌کنیم و از شمارندهٔ threads_in_pop_ هم یکی کم می‌کنیم.

برمیگردیم به Checkpoint 4، جایی که از مقدار threads_in_pop_ یکی کم کردیم و بررسی کردیم که مقدار جدیدش برابر با صفر باشه. درواقع با صفر شدنش ما متوجه میشیم که هیچ ترد دیگه‌ای دسترسی به لیست انتظاری که ما برداشتیم نداره. اما دقیقا مشکل کجا بود که ما اینکار رو کردیم؟ مسئله اینه که در فاصلهٔ بین Checkpoint 3 و عملیات atomic exchange ای که بعدش قرار داره، ممکنه تردهای دیگه‌ای وارد تابع ()pop بشن و ممکن هست بعضیاشون گرهٔ جدیدی به لیست اضافه کنن. خودِ نفس اضافه کردن گره جدید به لیست انتظار خیلی مشکل نیست. مشکل اینجاست که ممکنه اون گره‌ای که به لیست اضافه شده، یه ترد دیگه بهش دسترسی پیدا کرده باشه. فرض کنید اون شرط موجود در Checkpoint 4 رو نداریم. و فرض کنید ما یک ترد هستیم که یک گره رو از استک برداشتیم و تنها تردی هم هستیم که در تابع ()pop قرار داره. عملیات‌ها رو ادامه میدیم و میرسیم به Checkpoint 3 و بعد از اجرا شدن اون عملیات، suspend میشیم. بعد، یک ترد دیگه وارد تابع ()pop میشه و از طریق متغییر node_to_retrieve به یک گره مثلا A دسترسی پیدا می‌کنه و مثلا همونجا suspend میشه. حالا یک ترد دیگه میاد و وارد تابع ()pop میشه. اون هم به گرهٔ A دسترسی پیدا می‌کنه و همهٔ مراحل رو هم طی می‌کنه و در نهایت گرهٔ A رو به لیستِ انتظار اضافه می‌کنه. حالا اگر تردِ اولیهٔ ما که suspend شده بود، از suspend دربیاد، میره و حافظهٔ تمام گره‌هایی که در لیستِ انتظار قرار دارند رو آزاد می‌کنه. و تمام. به فنا رفتیم! اون تردِ دومی که متغییر node_to_retrieve رو داشت، از suspend درمیاد و سعی می‌کنه به حافظه‌ای که آزاد شده دسترسی پیدا کنه :) و UB خواهیم داشت.

جمع‌بندی

راه‌حلی که اینجا باهمدیگه دیدیم، کامل نیست. همونطور که دیدیم، این راه‌حل متکی به این هست که یه بازهٔ زمانی‌ای بوجود بیاد که فقط یک ترد داخل تابع ()pop باشه. این روش احتمالا در سیستم هایی که load پایینی دارند کار کنه، اما در سیستم‌هایی که load بالایی دارند، احتمالا هیچوقت فرصتش پیش نمیاد که فقط یک ترد در تابع ()pop باشه. در نتیجه ما فقط یک لیست انتظار داریم که همش بزرگ و بزرگتر میشه و این هم عملا فرقی با memory leak نخواهد داشت. پس باید چه کنیم؟ راه‌حل این مشکل رو در پست بعدی باهمدیگه بررسی خواهیم کرد و اون هم چیزی نخواهد بود جز استفاده از Hazard Pointers :)

عزت زیاد.