实现支持并发安全的有序链表

C++并发

头文件

  • <atomic>
    • 该头文主要声明了两个类, std::atomic 和 std::atomic_flag,另外还声明了一套 C 风格的原子类型和与 C 兼容的原子操作的函数。
  • <thread>
    • 该头文件主要声明了 std::thread 类,另外 std::this_thread 命名空间也在该头文件中。
  • <mutex>
    • 该头文件主要声明了与互斥量(mutex)相关的类,包括 std::mutex 系列类,std::lock_guard, std::unique_lock, 以及其他的类型和函数。
  • <condition_variable>
    • 该头文件主要声明了与条件变量相关的类,包括 std::condition_variable 和 std::condition_variable_any。
  • <future>
    • 该头文件主要声明了 std::promise, std::package_task 两个 Provider 类,以及 std::future 和 std::shared_future 两个 Future 类,另外还有一些与之相关的类型和函数,std::async() 函数就声明在此头文件中。

这里主要使用<thread><mutex>实现。

例子

根据线程执行的顺序,可以分为detachjoin

#include <iostream>
#include <thread>

using namespace std;

void output(int i)
{
    cout << "This thread number is " << i << endl;
    cout << "Thread " << i << " has finished" << endl;
}

int main()
{
    for (int i = 0; i < 5; i++)
    {
        thread t(output, i);
        t.detach(); //t.join();
    }
    getchar();
    return 0;
}

执行命令:g++ -std=c++0x test.cpp -pthread -o test

detach的结果为:

linkedlist1

join的结果为:

linkedlist2

可以发现,detach方式,启动的线程自主在后台运行,当前的代码继续往下执行,不等待新线程结束。join方式,等待启动的线程完成,才会继续往下执行。

并发链表

这里使用lock coupling方法进行并发处理,简单来说:

  • 对于Add操作,每次需要锁住当前插入节点和前驱节点
  • 对于Remove操作,每次需要锁住删除节点和前驱节点
  • Contain操作需要锁吗?(可以不需要)

这其实是一种很严格的锁,每次操作从头往后遍历,处理每个节点时,其他线程都不能操作。

代码

Linkdedlist.h

#ifndef LINKEDLIST_H // include guard
#define LINKEDLIST_H

#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <fstream>

using namespace std;

class Node
{
  public:
    Node *next_;
    mutex mutex_;
    int key_;
    int value_;
    Node() : next_(nullptr){};
    Node(int key, int value) : next_(nullptr), key_(key), value_(value){};
};

void Add(Node *head, Node *node)
{
    if (node == nullptr || head == nullptr)
        return;
    printf("Inserting %d ....\n", node->key_);
    // need to lock first
    head->mutex_.lock();
    Node *prev = head;
    Node *current = head->next_;
    while (current != nullptr)
    {
        current->mutex_.lock();
        if (node->key_ < current->key_)
        {
            prev->next_ = node;
            node->next_ = current;
            printf("Interted %d success!\n", node->key_);
            current->mutex_.unlock();
            prev->mutex_.unlock();
            return;
        }
        else if (node->key_ == current->key_)
        {
            printf("Inserting %d failed! duplicated key\n", node->key_);
            current->mutex_.unlock();
            prev->mutex_.unlock();
            return;
        }
        else
        {
            prev->mutex_.unlock();
            prev = current;
            current = current->next_;
        }
    }
    // insert to the last
    printf("Interted %d success to the last!\n", node->key_);
    prev->next_ = node;
    prev->mutex_.unlock();
}

void Remove(Node *head, int key)
{
    if (head == nullptr)
        return;
    printf("Romoving %d ....\n", key);
    head->mutex_.lock();
    Node *prev = head;
    Node *current = head->next_;
    while (current != nullptr && current->key_ < key)
    {
        current->mutex_.lock();
        prev->mutex_.unlock();
        prev = current;
        current = current->next_;
    }
    if (current != nullptr && current->key_ == key)
    {
        // its important to lock current node to ensure delete safely
        current->mutex_.lock();
        prev->next_ = current->next_;
        delete current;
        prev->mutex_.unlock();
        printf("Romove %d success!\n", key);
    }
    else
    {
        prev->mutex_.unlock();
        printf("Romove %d failed! Key not exited!\n", key);
    }

    return;
}

bool Contain(Node *head, int key)
{
    if (head == nullptr)
        return false;
    Node *prev = head;
    Node *current = head->next_;
    while (current != nullptr)
    {
        // prev->mutex_.lock();
        // current->mutex_.lock();
        if (current->key_ == key)
        {
            // current->mutex_.unlock();
            // prev->mutex_.lock();
            return true;
        }
        else if (current->key_ < key)
        {
            // prev->mutex_.unlock();
            // current->mutex_.unlock();
            prev = current;
            current = current->next_;
        }
        else
            return false;
    }
    return false;
}

void Print(Node *head)
{
    head->mutex_.lock();
    ofstream fout("result.txt");
    fout << "----------PRINT----------" << endl;
    for (Node *current = head->next_; current != nullptr; current = current->next_)
    {
        fout << current->key_ << ":" << current->value_ << endl;
    }
    head->mutex_.unlock();
    fout << endl;
}

#endif

Test.cpp

#include <iostream>
#include <thread>
#include <vector>
#include "LinkedList.h"
using namespace std;

//test for add + remove thread
void ARTest(Node *head, Node *node, int dlkey)
{
    Add(head, node);
    Remove(head, dlkey);
}

//test for Remove + Add Thread
void RATest(Node *head, Node *node, int dlkey)
{
    Remove(head, dlkey);
    Add(head, node);
}

int main()
{
    auto head = new Node(0, 0);

    //test for AddThreads
    vector<thread> AddThreads;
    for (int i = 0; i < 1000; ++i)
    {
        auto node = new Node(i, i + 100);
        AddThreads.emplace_back(Add, head, node);
    }
    for (auto &thread1 : AddThreads)
    {
        thread1.join();
    }

    //  //test for RemoveThreads
    //  vector<thread> RemoveThreads;
    //  for(int i = 0; i < 1000; i = i + 2){
    //    RemoveThreads.emplace_back(Remove, head, i);
    //  }
    //  for (auto& thread : RemoveThreads) {
    //    thread.join();
    //  }

     //test for Add + Remove Thread
    //  vector<thread> ARThreads;
    //  for(int i = 0; i < 1000; i = i + 2){
    //    auto node1 = new Node(i+1,i+100);
    //    ARThreads.emplace_back(ARTest,head,node1,i);
    //  }
    //  for (auto& thread : ARThreads) {
    //    thread.join();
    //  }

    // test for Remove + Add Thread
    vector<thread> RAThreads;
    for (int i = 0; i < 1000; i = i + 2)
    {
        auto node2 = new Node(i + 1, i + 100);
        RAThreads.emplace_back(RATest, head, node2, i);
    }
    for (auto &thread2 : RAThreads)
    {
        thread2.join();
    }
    Print(head);
    return 0;
}

注意点

  1. 每次操作,需要首先锁住head头指针!!!
  2. 需要判断是否已经存在同样的key。

运行

执行g++ -std=c++0x testList.cpp -pthread,结果保存在result.txt文件中

linkedlist3

results matching ""

    No results matching ""