实现支持并发安全的有序链表
C++并发
头文件
<atomic>
- 该头文主要声明了两个类, std::atomic 和 std::atomic_flag,另外还声明了一套 C 风格的原子类型和与 C 兼容的原子操作的函数。
<thread>
- 该头文件主要声明了 std::thread 类,另外 std::this_thread 命名空间也在该头文件中。
<mutex>
- 该头文件主要声明了与互斥量(mutex)相关的类,包括 std::mutex 系列类,std::lock_guard, std::unique_lock, 以及其他的类型和函数。
<condition_variable>
- 该头文件主要声明了与条件变量相关的类,包括 std::condition_variable 和 std::condition_variable_any。
<future>
- 该头文件主要声明了 std::promise, std::package_task 两个 Provider 类,以及 std::future 和 std::shared_future 两个 Future 类,另外还有一些与之相关的类型和函数,std::async() 函数就声明在此头文件中。
这里主要使用<thread>
和<mutex>
实现。
例子
根据线程执行的顺序,可以分为detach
和join
:
#include <iostream>
#include <thread>
using namespace std;
void output(int i)
{
cout << "This thread number is " << i << endl;
cout << "Thread " << i << " has finished" << endl;
}
int main()
{
for (int i = 0; i < 5; i++)
{
thread t(output, i);
t.detach(); //t.join();
}
getchar();
return 0;
}
执行命令:g++ -std=c++0x test.cpp -pthread -o test
detach
的结果为:
join
的结果为:
可以发现,detach
方式,启动的线程自主在后台运行,当前的代码继续往下执行,不等待新线程结束。join
方式,等待启动的线程完成,才会继续往下执行。
并发链表
这里使用lock coupling方法进行并发处理,简单来说:
- 对于
Add
操作,每次需要锁住当前插入节点和前驱节点 - 对于
Remove
操作,每次需要锁住删除节点和前驱节点 Contain
操作需要锁吗?(可以不需要)
这其实是一种很严格的锁,每次操作从头往后遍历,处理每个节点时,其他线程都不能操作。
代码
Linkdedlist.h
#ifndef LINKEDLIST_H // include guard
#define LINKEDLIST_H
#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <fstream>
using namespace std;
class Node
{
public:
Node *next_;
mutex mutex_;
int key_;
int value_;
Node() : next_(nullptr){};
Node(int key, int value) : next_(nullptr), key_(key), value_(value){};
};
void Add(Node *head, Node *node)
{
if (node == nullptr || head == nullptr)
return;
printf("Inserting %d ....\n", node->key_);
// need to lock first
head->mutex_.lock();
Node *prev = head;
Node *current = head->next_;
while (current != nullptr)
{
current->mutex_.lock();
if (node->key_ < current->key_)
{
prev->next_ = node;
node->next_ = current;
printf("Interted %d success!\n", node->key_);
current->mutex_.unlock();
prev->mutex_.unlock();
return;
}
else if (node->key_ == current->key_)
{
printf("Inserting %d failed! duplicated key\n", node->key_);
current->mutex_.unlock();
prev->mutex_.unlock();
return;
}
else
{
prev->mutex_.unlock();
prev = current;
current = current->next_;
}
}
// insert to the last
printf("Interted %d success to the last!\n", node->key_);
prev->next_ = node;
prev->mutex_.unlock();
}
void Remove(Node *head, int key)
{
if (head == nullptr)
return;
printf("Romoving %d ....\n", key);
head->mutex_.lock();
Node *prev = head;
Node *current = head->next_;
while (current != nullptr && current->key_ < key)
{
current->mutex_.lock();
prev->mutex_.unlock();
prev = current;
current = current->next_;
}
if (current != nullptr && current->key_ == key)
{
// its important to lock current node to ensure delete safely
current->mutex_.lock();
prev->next_ = current->next_;
delete current;
prev->mutex_.unlock();
printf("Romove %d success!\n", key);
}
else
{
prev->mutex_.unlock();
printf("Romove %d failed! Key not exited!\n", key);
}
return;
}
bool Contain(Node *head, int key)
{
if (head == nullptr)
return false;
Node *prev = head;
Node *current = head->next_;
while (current != nullptr)
{
// prev->mutex_.lock();
// current->mutex_.lock();
if (current->key_ == key)
{
// current->mutex_.unlock();
// prev->mutex_.lock();
return true;
}
else if (current->key_ < key)
{
// prev->mutex_.unlock();
// current->mutex_.unlock();
prev = current;
current = current->next_;
}
else
return false;
}
return false;
}
void Print(Node *head)
{
head->mutex_.lock();
ofstream fout("result.txt");
fout << "----------PRINT----------" << endl;
for (Node *current = head->next_; current != nullptr; current = current->next_)
{
fout << current->key_ << ":" << current->value_ << endl;
}
head->mutex_.unlock();
fout << endl;
}
#endif
Test.cpp
#include <iostream>
#include <thread>
#include <vector>
#include "LinkedList.h"
using namespace std;
//test for add + remove thread
void ARTest(Node *head, Node *node, int dlkey)
{
Add(head, node);
Remove(head, dlkey);
}
//test for Remove + Add Thread
void RATest(Node *head, Node *node, int dlkey)
{
Remove(head, dlkey);
Add(head, node);
}
int main()
{
auto head = new Node(0, 0);
//test for AddThreads
vector<thread> AddThreads;
for (int i = 0; i < 1000; ++i)
{
auto node = new Node(i, i + 100);
AddThreads.emplace_back(Add, head, node);
}
for (auto &thread1 : AddThreads)
{
thread1.join();
}
// //test for RemoveThreads
// vector<thread> RemoveThreads;
// for(int i = 0; i < 1000; i = i + 2){
// RemoveThreads.emplace_back(Remove, head, i);
// }
// for (auto& thread : RemoveThreads) {
// thread.join();
// }
//test for Add + Remove Thread
// vector<thread> ARThreads;
// for(int i = 0; i < 1000; i = i + 2){
// auto node1 = new Node(i+1,i+100);
// ARThreads.emplace_back(ARTest,head,node1,i);
// }
// for (auto& thread : ARThreads) {
// thread.join();
// }
// test for Remove + Add Thread
vector<thread> RAThreads;
for (int i = 0; i < 1000; i = i + 2)
{
auto node2 = new Node(i + 1, i + 100);
RAThreads.emplace_back(RATest, head, node2, i);
}
for (auto &thread2 : RAThreads)
{
thread2.join();
}
Print(head);
return 0;
}
注意点
- 每次操作,需要首先锁住head头指针!!!
- 需要判断是否已经存在同样的key。
运行
执行g++ -std=c++0x testList.cpp -pthread
,结果保存在result.txt
文件中