Bob Cheng
System Programming

Threads

Process Concept

  • 一個 process 代表正在執行的一個程式
    • OS kernel 用來排程的單位
  • 每個 process 互相獨立 : 有自己的 address space, open file table, pending signals, signal handler, etc.

Thread Concept

  • 一個 thread 代表在一個 process 內一連串的操作
  • 傳統的 UNIX process 都只有一個 thread (single-threaded process)
  • 現代的 UNIX 系統下,一個 process 裡面可以有多個 threads (multi-threaded process)
  • thread 又被稱做 lightweight process
    • 相同 process 的不同 threads 會共享資源
      shared by threads (of same process)for each thread
      Text, Global Data, Heap, Open fds, Environment variables, PID, PPID, UID, EUID, GID, EGID, File record locks, Signal, Pending alarms, Signal handlersThread ID, Stack, Signal mask (pthread_sigmask), An errno variable, Scheduling properties, Thread-specific data

以下的 threads 皆為 POSIX threads (pthreads)

Process v.s. Thread

ProcessThread
定義正在執行的 program在 process 內一連串的操作
使用資源較多較少
存活時間較長較短
記憶體可以和 parent 共享 (COW), 或完全不共享default : 共享相同的 address space

Benefits of Multi-threaded

  • 增加反應能力
  • 資源共享
  • 增加 program 的吞吐量
    • 各個獨立的 task (不依賴其他 tasks) 可以透過指派給不同的 threads 來交錯進行
  • 對於需要進行很多相似 task 的單一 program 很有用

Multithreading Models

不同的 models 差別在於 user threads 和 kernel threads 之間的關係

ULT and KLT

Many-to-one Model

多個 user threads 對應單一 kernel thread

  • kernel 不負責管理 threads (kernel 不知道有多個 threads 的存在,因為它一次只處裡一個 thread)
  • 由 threads library 負責管理和排程 threads (in user space)
  • 現在不太有系統採用此方式
  • 優點 :
    • 不用更改 kernel 來做 multithreading
  • 缺點 :
    • 只要有某個 thread 做了 slow system call,其他 threads 都會被 blocked
    • 多個 threads 不能平行運行在多個 CPUs 上

One-to-one Model

每個 user thread 都對應到一個 kernel thread

  • kernel 負責管理 threads
  • 要建立 user thread 必須建立對應的 kernel thread
  • Liunx 和 Windows 都採用此方式
  • 優點 :
    • 多個 threads 可以平行運行在多個 CPUs 上,不會互相 block
  • 缺點 :
    • 建立 thread 需要更多資源

POSIX Threads

POSIX threads (pthreads) : 規範 c programs 的 threads 的 POSIX 標準
pthreads 規範了 60 幾種函數來讓 program 可以操作 threads

運作方式
當一個 program 開始運行時,會先開啟一個 process 裡面只有一個 thread 叫做 main thread
在運行期間 main thread 會建立更多 peer threads

Thread context switch 可能由 library 或 kernel 執行 (取決於 model)

識別 thread : pthread_self(), pthread_equal()

pthread_t 資料型別用來表示 thread ID :

  • 只有在其所屬 process 底下的 thread ID 才有意義
  • POSIX.1 允許我們用其他資料型別來表示 thread ID
// pthread_self() always succeeds
pthread_t pthread_self(void);
// Returns: nonzero if equal, 0 otherwise
int pthread_equal(pthread_t tid1, pthread_t tid2);
  • 一個 thread 可以透過 pthread_self() 來獲取自己的 TID
  • pthread_equal() 用來比對兩個 TID 是否相同 (==不能直接用 ====)

建立 thread : pthread_create()

// Returns: 0 if OK, error number on failure
int pthread_create(pthread_t *tidp, pthread_attr_t *attr, void *(*start_rtn)(void *), void *arg);

pthread_create() 會建立新的 thread 並指定它要執行的流程

  • tidp : 指向用來儲存 TID 的 memory buffer
  • attr : 用來調整 thread 的不同屬性 (NULL : default)
  • start_rtn : 新的 thread ==最一開始要執行的 function== (==start routine==) 的位址
  • arg : 要傳給 start_rtn 的參數

main thread 會呼叫 pthread_create() 來建立新的 peer thread 來平行執行工作

main thread 和 peer thread 的執行順序不一定

pthread_t ntid;

void printids(const char *s){
	pid_t pid;
	pthread_t tid;
	pid = getpid();
	tid = pthread_self();
	printf("%s pid %lu tid %lu (0x%lx)\n", s, (unsigned long)pid,
	(unsigned long)tid, (unsigned long)tid);
}

void *thr_fn(void *arg){
	printids("new thread: ");
	return((void *)0);
}

int main(void){
	int err;
	err = pthread_create(&ntid, NULL, thr_fn, NULL);
	if (err != 0)
		err_exit(err, "can’t create thread");
	printids("main thread:");
	sleep(1);
	exit(0);
}
main thread: pid 20075 tid 1 (0x1)
new thread:  pid 20075 tid 2 (0x2)

Thread Termination

結束 thread : pthread_exit()

void pthread_exit(void *rval_ptr);

一個 thread 呼叫 pthread_exit() 來 exit

join a thread : pthread_join()

// Returns: 0 if OK, error number on failure
int pthread_join(pthread_t thread, void **rval_ptr);

By default,thread 在建立時會包含 joinable 屬性 :

  • 一個 joinable thread 可以被其他 threads 結束和接收
  • joinable thread 的資源直到被其他 thread 接收之前不會被釋放
  • 為了避免記憶體流失,每個 joinable thread 都需要透過其他 thread 呼叫 pthread_join() 來接收其資源

pthread_join() 用來等待 thread terminate 並接收它

  • pthread_join() 會 block caller thread 直到 thread terminate
  • rval_ptr :
    • thread 的 return value (exit status) : 如果 thread 從它的 start routine 返回 或 thread 呼叫 pthread_exit()
    • PTHREAD_CANCELED : 如果 thread 被 canceled
    • 設成 NULL 來忽略

    如果 rval_ptr 指向的結構是分配在 thread 的 stack 上面,此時存取該指標會造成 undefined behaviors

void *thr_fn1(void *arg){
	printf("thread 1 returning\n");
	return((void *)1);
}

void *thr_fn2(void *arg){
	printf("thread 2 exiting\n");
	pthread_exit((void *)2);
}

int main(void){
	pthread_t tid1, tid2;
	void *tret;

	pthread_create(&tid1, NULL, thr_fn1, NULL);
	pthread_create(&tid2, NULL, thr_fn2, NULL);

	pthread_join(tid1, &tret);
	printf("thread 1 exit code %ld\n", (long)tret);

	pthread_join(tid2, &tret);
	printf("thread 2 exit code %ld\n", (long)tret);

	exit(0);
}

detach a thread : pthread_detach()

// Returns: 0 if OK, error number on failure
int pthread_detach(pthread_t tid);

By default,一個 (joinable) thread 的 exit status 會被保留直到我們呼叫 pthread_join() 來接收它為止
我們可以用 pthread_detach() 來讓一個 thread 被 ==detached==,則它的資源會直接被 kernel 回收,不用再另外接收它
pthread_detach() 會讓 thread tid 被 detached

對一個 detached thread 呼叫 pthread_join() 或是 pthread_detach() 都會導致 undefined behaviors

Thread Cancellation

cancel a thread : pthread_cancel()

// Returns: 0 if OK, error number on failure
int pthread_cancel(pthread_t tid);

一個 thread 可以透過呼叫 pthread_cancel() 來讓所屬相同 process 的某個 thread tid 被 ==cancel== (也就是 terminate)

  • 相當於 thread tid 自己呼叫 pthread_exit()
  • thread tid 可以選擇被 cancel 的方式 或 直接忽略
    • 取決於兩個 thread 屬性 : ==cancelability state== 和 ==cancelability type==
  • pthread_cancel() 只負責"要求” → 不會 block 或是等待該 thread terminate
    • 只有一個方法可以確認 thread 是否被 cancel,呼叫 pthread_join() 並且 exit status 為 ==PTHREAD_CANCELED==

在 Linux 裡面,當 thread tid 接收到 cancel request 時,會做以下事情

  1. 呼叫 Clean-up handlers
  2. 呼叫 data destructors (thread-specific)
  3. thread terminate

設定 cancel state : pthread_setcancelstate()

// Returns: 0 if OK, error number on failure
int pthread_setcancelstate(int state, int *oldstate);

cancelability state 有兩種 :

  • PTHREAD_CANCEL_ENABLE (預設值) : thread 可以被 cancel
  • PTHREAD_CANCEL_DISABLE : thread 不能被 cancel,如果收到 cancel request,則 thread 會 block 直到 state 變成 enable

設定 cancel type : pthread_setcanceltype()

// Returns: 0 if OK, error number on failure
int pthread_setcanceltype(int type, int *oldtype);

cancelability type 有兩種 :

  • PTHREAD_CANCEL_ASYNCHRONOUS : thread 可以隨時被 cancel
  • PTHREAD_CANCEL_DEFERRED (預設值) : cancel request 會 pending 直到 thread 下次呼叫可以做為 cancellation point 的 function

建立 cancellation point : pthread_testcancel()

void pthread_testcancel(void);

pthread_testcancel() 會在 caller thread 建立 cancellation point
所以即便當 thread 正在執行不包含 cancellation point 的程式碼時,也可以回應 cancel request

如果 cancel state == PTHREAD_CANCEL_DISABLE 或是 現在沒有 cancel request 正在 pending,則 pthread_testcancel() 沒有任何作用

type \ statePTHREAD_CANCEL_ENABLEPTHREAD_CANCEL_DISABLE
PTHREAD_CANCEL_DEFERREDcancel request pending until thread call cancellation pointthread is blockec until state becomes enable
PTHREAD_CANCEL_ASYNCHRONOUSthread is canceled immediately

Clean Handler

void pthread_cleanup_push(void (*rtn)(void *), void *arg);
void pthread_cleanup_pop(int execute);

pthread_cleanup_push() 用來註冊 thread cleanup handler rtn

  • arg : rtn 的參數
  • 一個 thread 可以註冊多個 handlers
  • cleanup handlers 會被儲存在 stack 裡面,並依照註冊的順序的相反被呼叫

pthread_cleanup_pop() 會將 stack 最頂層的 handler 移除

  • 如果 execute ≠ 0 則會在移除時呼叫該 handler

當以下三種情況發生時,cleanup handler 會被自動呼叫:

  • 呼叫 pthread_exit()
  • 回應 cancel request
  • 呼叫 pthread_cleanup_pop() 而且 execute ≠ 0
void cleanup(void *arg){
  printf("cleanup: %s\n", (char *)arg);
}

void *thr_fn1(void *arg){
  printf("thread 1 start\n");
  pthread_cleanup_push(cleanup, "thread 1 first handler");
  pthread_cleanup_push(cleanup, "thread 1 second handler");
  printf("thread 1 push complete\n");
  if (arg)
    return((void *)1);
  pthread_cleanup_pop(0);
  pthread_cleanup_pop(0);
  return((void *)1);
}

void *thr_fn2(void *arg){
  printf("thread 2 start\n");
  pthread_cleanup_push(cleanup, "thread 2 first handler");
  pthread_cleanup_push(cleanup, "thread 2 second handler");
  printf("thread 2 push complete\n");
  if (arg)
    pthread_exit((void *)2);
  pthread_cleanup_pop(0);
  pthread_cleanup_pop(0);
  pthread_exit((void *)2);
}

int main(void){
  pthread_t tid1, tid2;
  void *tret;

  pthread_create(&tid1, NULL, thr_fn1, (void *)1);
  pthread_create(&tid2, NULL, thr_fn2, (void *)1);

  pthread_join(tid1, &tret);
  printf("thread 1 exit code %ld\n", (long)tret);

  pthread_join(tid2, &tret);
  printf("thread 2 exit code %ld\n", (long)tret);
  exit(0);
}
thread 1 start
thread 1 push complete
thread 2 start
thread 2 push complete
cleanup: thread 2 second handler
cleanup: thread 2 first handler
thread 1 exit code 1
thread 2 exit code 2

因為 thread 1 由它的 start routine 返回,因此不會呼叫 cleanup handler

Thread Attributes

NameDescirption
detachstatedetached thread attribute
guardsizeguard buffer size in bytes at end of thread state
stackaddrlowest address of thread stack
stacksizeminimum size in bytes of thread stack

建立 thread 屬性物件 : pthread_attr_init(), pthread_attr_destory()

// Returns: 0 if OK, error number on failure
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr);

pthread_attr_init() 會將 attr 的所有屬性設定成預設值

  • 之後可以再利用其他 function 來調整各個屬性的值
  • attr 之後可以被 pthread_create() 使用

一旦某個 attr 不再需要,必須使用 pthread_attr_destroy() 來摧毀 attr 並收回記憶體空間

  • attr 必須重新使用 pthread_attr_init() 才能再次使用
  • 不會對先前使用該 attr 的 thread 造成影響

thread detachstate attr : pthread_attr_getdetachstate(), pthread_attr_setdetachstate()

// Returns: 0 if OK, error number on failure
int pthread_attr_getdetachstate(const pthread_attr_t *attr, int *detachstate);
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);

pthread_attr_setdetachstate() 會將 attrdetachstate 屬性設定成 detachstate 的值

  • detachstate 用來指定 thread 的起始狀態 detached (PTHREAD_CREATE_DETACHED) 或是 joinable (PTHREAD_CREAT_JOINABLE)

pthread_attr_getdetachstate() 會將 attr 裡面的 detachstate 屬性儲存到 detachstate 裡面

Thread Synchronization

由於多個 threads 會共用同個 process address space

  • 每個 thread 都可能對 shared memory 做讀寫
  • 同時讀取不會造成問題
  • 但同時寫入可能會引發 race condition
int var = 0;

void *thr_fn(void *arg){
  var++;
}

int main(void){
  int err, i;
  pthread_t id[2];

  for (i = 0; i < 2; i++) {
    err = pthread_create(&id[i], NULL, thr_fn, NULL);
    if (err != 0)
      err_exit(err, "can’t create thread”);
  }

  for (i = 0; i < 2; i++) {
    pthread_join(id[i], NULL);
  }

  exit(0);
}

以組合語言的角度來看, var++ 其實由三個步驟組成 :

  1. 從記憶體讀取 var 到 register
  2. 在 register 裡面將該值加上 1
  3. 將新的值從 register 寫入到 var 的記憶體位置 因此實際上程式的執行會如下 (沒有做同步) :

Thread Mutex

互斥鎖 (mutex = mutual exclusion) 的目標是讓一次只有一個 thread 可以存取 shared memory

  • 基本和 lock 的運作原理一樣
  • 一個 thread 在存取 shared memory 之前 acquire mutex;在結束之後 release mutex
  • 當一個 mutex 被上鎖時,其他嘗試獲取該 mutex 的 threads 會被 block 直到該 mutex 被釋放
    • 一旦該 mutex 被釋放,所有在等待的 threads 將會 unblock,==第一個執行的 thread== 會獲取該 mutex
    • 其他 threads 則會再確認一次 mutex 的狀態,並回到 block 的狀態等待下一次釋放

建立 mutex : pthread_mutex_init(), pthread_mutex_destroy()

// the following returns 0 if OK, error number on failure
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);

pthread_mutex_t 用來表示一個 mutex

有兩個方法可以初始化一個 mutex :

  1. mutex = PTHREAD_MUTEX_INITIALIZER
  2. pthread_mutex_init() 來初始化 mutex 並設定屬性 attr (NULL : default)

建立 mutex 屬性物件 : pthread_mutexattr_init(), pthread_mutexattr_destroy()

// the following returns 0 if OK, error number on failure
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);

pthread_mutexattr_init() 會將 mutex 屬性物件 attr 初始化成預設值
pthread_mutexattr_destroy() 會摧毀 mutex 屬性物件 attr

mutex process-shared attr : pthread_mutexattr_getpshared(), pthread_mutexattr_setpshared()

// the following returns 0 if OK, error number on failure
int pthread_mutexattr_getpshared(pthread_mutexattr_t *attr);
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared);

process-shared 有兩種值 :

  • PTHREAD_PROCESS_PRIVATE (預設值) : mutex 只能被和建立者 thread 所屬相同 process 的 threads 操作
  • PTHREAD_PROCESS_SHARED : mutex 可以被所屬不同 process 的 threads 操作

使用方法

// all returns 0 if OK, error number on failure
int pthread_mutex_lock(pthread_mutext_t *mutex);
int pthread_mutex_trylock(pthread_mutext_t *mutex);
int pthread_mutex_unlock(pthread_mutext_t *mutex);

pthread_mutex_lock() & pthread_mutex_trylock() : 對 mutex 上鎖

  • 如果 mutex 是 ==unlocked==,兩者都會直接將其上鎖並回傳 0
  • 如果 mutex 已經 ==locked== ==:==
    • pthread_mutex_lock() : caller thread 會 block 直到其 unlocked
    • pthread_mutex_trylock() : 立即回傳 EBUSY

pthread_mutex_unlock() : 將 mutex 解鎖

  • 如果有多個 threads 被 mutex blocking,當呼叫 pthread_mutex_unlock() 後,第一個執行的 thread 會獲得該 mutex,其他 threads 則會繼續 blocking
  • 只能對已持有的 mutex 進行解鎖,否則會導致 undefined behavior
struct foo {
  int f_count;
  pthread_mutex_t f_lock;
  int f_id;
  /* ... more stuff here ... */
};

/* allocate the object */
  struct foo *foo_alloc(int id){
  struct foo *fp;
  if ((fp = malloc(sizeof(struct foo))) != NULL) {
    fp->f_count = 1;
    fp->f_id = id;
    if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
      free(fp);
      return(NULL);
    }
    /* ... continue initialization ... */
  }
  return(fp);
}

/* add a reference to the object */
void foo_hold(struct foo *fp){
  pthread_mutex_lock(&fp->f_lock);
  fp->f_count++;
  pthread_mutex_unlock(&fp->f_lock);
}

/* release a reference to the object */
void foo_rele(struct foo *fp){
pthread_mutex_lock(&fp->f_lock);
  if (--fp->f_count == 0) { /* last reference */
    pthread_mutex_unlock(&fp->f_lock);
    pthread_mutex_destroy(&fp->f_lock);
    free(fp);
  } else {
    pthread_mutex_unlock(&fp->f_lock);
  }
}

Deadlock

在先前的 process control 有提到,同樣地,在使用 mutex 時也可能發生 deadlock,舉例來說

  • 一個 thread 對自己上鎖的 mutex 再上鎖一次
    • 可以設定以下屬性來避免 self-deadlocking pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK);
  • 一個 thread 和另一個 thread 以相反的順序對多個 mutexes 上鎖

Example of Avoiding Deadlock

只要每個 thread 都以相同的順序對每個 mutex 上鎖就不會發生 deadlock

#include <stdlib.h>
#include <pthread.h>
#define NHASH 29
#define HASH(id) (((unsigned long)id)%NHASH)

struct foo *fh[NHASH];

pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

struct foo {
  int f_count; /* protected by hashlock */
  pthread_mutex_t f_lock;
  int f_id;
  struct foo *f_next; /* protected by hashlock */
  /* ... more stuff here ... */
};

/* allocate the object */
struct foo *foo_alloc(int id){
  struct foo *fp;
  int idx;

  if ((fp = malloc(sizeof(struct foo))) != NULL) {
    fp->f_count = 1;
    fp->f_id = id;
    if (pthread_mutex_init(&fp->f_lock, NULL) != 0) {
      free(fp);
      return(NULL);
    }
    idx = HASH(id);
    pthread_mutex_lock(&hashlock);
    fp->f_next = fh[idx];
    fh[idx] = fp;
    pthread_mutex_lock(&fp->f_lock);
    pthread_mutex_unlock(&hashlock);
    /* ... continue initialization ... */
    pthread_mutex_unlock(&fp->f_lock);
  }
  return(fp);
}

/* add a reference to the object */
void foo_hold(struct foo *fp){
  pthread_mutex_lock(&hashlock);
  fp->f_count++;
  pthread_mutex_unlock(&hashlock);
}


/* find an existing object */
struct foo *foo_find(int id){
  struct foo *fp;
  pthread_mutex_lock(&hashlock);
  for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next) {
    if (fp->f_id == id) {
      fp->f_count++;
      break;
    }
  }
  pthread_mutex_unlock(&hashlock);
  return(fp);
}

/* release a reference to the object */
void
foo_rele(struct foo *fp){
  struct foo *tfp;
  int idx;
  pthread_mutex_lock(&hashlock);
  if (--fp->f_count == 0) { /* last reference, remove from list */
    idx = HASH(fp->f_id);
    tfp = fh[idx];
    if (tfp == fp) {
      fh[idx] = fp->f_next;
    } else {
      while (tfp->f_next != fp)
        tfp = tfp->f_next;
      tfp->f_next = fp->f_next;
    }
    pthread_mutex_unlock(&hashlock);
    pthread_mutex_destroy(&fp->f_lock);
    free(fp);
  } else {
    pthread_mutex_unlock(&hashlock);
  }
}

Reader-Writer Lock

除了 mutex,pthread 還有提供另一種鎖 — 讀寫鎖

讀寫鎖有三種狀態 :

stateRequest wead lockRequest write lock
unlocked==OK====OK==
read mode==OK====Block==
write mode==Block====Block==
  • 同一時間只有一個 thread 可以持有 write mode 的讀寫鎖
  • 同一時間可以有多個 threads 持有 read mode 的讀寫鎖

讀寫鎖相較於 mutex 更適合用在==讀取需求大於修改需求==的資料

建立讀寫鎖 : pthread_rwlock_init(), pthread_rwlock_destroy()

// the following returns 0 if OK, error number on failure
int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

pthread_rwlock_t 用來表示一個 rwlock

有兩個方法可以初始化一個 rwlock :

  1. ==pthread_rwlock_t== ==rwlock === PTHREAD_RWLOCK_INITIALIZER;
  2. pthread_rwlock_init() 來初始化 rwlock 並設定屬性 attr (NULL : default)

使用方法

// the following returns 0 if OK, error number on failure
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

pthread_rwlock_rdlock() :rwlock 上鎖成 read mode
pthread_rwlock_wrlock() : 將 rwlock 上鎖成 write mode
pthread_rwlock_unlock() : 將 rwlock 解鎖
pthread_rwlock_tryrdlock(), pthread_rwlock_trywrlock() 用來確認是否可以上鎖,如果不行則返回 EBUSY

Condition Variables

假設 thread A 只有當某個 condition 為真時,才會繼續執行,而這個 condition 需要由另一個 thread B 來改變,我們要如何讓這兩個 thread 之間可以互相溝通呢?

condition variable 是一種特殊的全域(同步)變數,需要和 mutex 一併操作

  • thread 需要獲取 mutex 才能查看、更動 condition variable

condition variable 主要有兩種操作

  • a thread release its mutex and block waits on the condition variable
  • (another) thread changes the condition and notifies (by signal or broadcast) the condition variable to unblock the waiting thread

建立條件變數 : pthread_cond_init(), pthread_cond_destroy()

// the following returns 0 if OK, error number on failure
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cond)

老生常談,pthread_cond_t 用來表示一個 cond

有兩個方法可以初始化一個 cond :

  1. ==pthread_cond_t== ==cond === PTHREAD_COND_INITIALIZER;
  2. pthread_cond_init() 來初始化 cond 並設定屬性 attr (NULL : default)

建立條件變數屬性物件 : pthread_cond_attrinit(), pthread_condattr_destroy()

// the following returns 0 if OK, error number on failure
int pthread_condattr_init(pthread_condattr_t *attr);
int pthread_condattr_destroy(pthread_condattr_t *attr);

pthread_condattr_init() 會將 cond 屬性物件 attr 初始化成預設值
pthread_condattr_destroy() 會摧毀 cond 屬性物件 attr
一樣也有 process-shared 屬性 (方法和 mutex 一樣,依樣畫葫蘆)

wait 條件變數 : pthread_cond_wait()

// the following returns 0 if OK, error number on failure
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *tsptr);

pthread_cond_wait() 會將 caller thread block 住,並等待條件變數 cond 被 signaled 或 broadcasted

  • mutex 必須是 caller thread 持有,否則會導致 undefined behavior
  • atomically unlocks mutex and wait
  • 當條件成立時,pthread_cond_wait() 會返回,此時 mutex 應該要被 caller thread 上鎖

pthread_cond_timedwait()pthread_cond_wait() 的運作方式一樣,不過當 tsptr 的時間到達時,會返回錯誤

signal and broadcast 條件變數 : pthread_cond_signal(), pthread_cond_broadcast()

// the following returns 0 if OK, error number on failure
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

pthread_cond_signal() 會 unblock 至少一個被 cond block 住的 threads
pthread_cond_broadcast() 會 unblock 所有被 cond block 住的 threads
如果沒有任何 thread 被 cond block 住,則兩個函數不會造成影響

注意

  • 當多個 threads 被同個條件變數 block 住時
    • 這些 threads 被 unblock 時會去爭奪 mutex
    • sechduler 會決定哪個 thread 先執行,這個 thread 就會獲得該 mutex,其他 thread 需要繼續等待下一次 unblock
  • pthread_cond_signal()pthread_cond_broadcast() 可以被非 mutex 持有者呼叫
    • 如果程式需要 predictable scheduling behavior,則 mutex 應該要由呼叫 pthread_cond_signal()pthread_cond_broadcast() 的 thread 給上鎖

範例:

Thread 1
pthread_cond_t ready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
int cond = 0;
.. ..
pthread_mutex_lock(&lock);
while (!cond)
  pthread_cond_wait(&ready, &lock);
pthread_mutex_unlock(&lock);

Thread 2
pthread_mutex_lock(&lock);
pthread_cond_signal(&ready);
pthread_mutex_unlock(&lock);

Spurious wakeup

當一個 thread 在 wait 條件變數時被喚醒,卻發現 condition 根本沒有被滿足
原因:在條件變數被 signaled 和 waiting thread 執行之間,有其他 thread 改動了該 condition
解法 : 用迴圈包住 wait,在每次醒來後都再次確認 condition 是否滿足

Spin Lock

當使用 pthread_mutex_lock() 時,如果無法順利獲取 mutex, thread 可能會被無限 block
→ 可以使用 spin lock

spin lock 的使用方式等和 mutex 基本上一樣,但會以 busy-waiting 的方式來 block

// the following returns 0 if OK, error number on failure
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
// all returns 0 if OK, error number on failure
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);

然而我們需要注意不能讓 spin lock 維持太長時間,以免消耗過多資源

更多 Thread

Thread and fork()

當一個 thread 呼叫了 fork(),產生出的 child process 只會有一個 thread。
該 child process 會繼承 mutex、reader-writer lock、conditional variable 等性質

問題 :
當 parent process 內的 threads 持有任何鎖,則 child process 也會持有這些鎖。然而 child process 內只有一個 thread (呼叫 fork() 的那個的複製),因此 child process 無法得知其他 threads (沒有呼叫 fork() 的那些) 持有的鎖,也無法解除這些鎖。

解法 :

  1. 立即呼叫 exec() (因為記憶體空間是新的,因此鎖不會繼承)
  2. 建立 fork handler 來將這些鎖解除

註冊 fork handler : pthread_atfork()

// returns 0 if OK, error number on failure
int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void))

pthread_atfork() 會註冊 fork handlers,該函數會在呼叫 fork() 時被執行

  • prepare : 在 fork() 建立 child process ==之前==由 parent process 執行的函數
  • parent : 在 fork() 建立 child process ==之後==,fork() 返回==之前==,由 parent process 執行的函數
  • child : 在 fork() 返回之前,由 child process 執行的函數
  • 三個函數都可以設成 NULL 來忽略

Thread and exec()

當一個包含多個 threads 的 process 呼叫 exec() 時,除了呼叫的 thread 之外,其他的 threads 都會立即被 terminate。

Thread and Signals

  • threads 之間會共享 signal handlers,不會共享 signal mask
  • 每個 thread 都有自己的 signal mask,繼承自呼叫 pthread_create() 的 thread
  • 所屬相同 process 的 threads 共享 signal dispositions
    • 任何 thread 都可以更改這些 dispostions
  • signals 實際上是被傳遞給 process 內的指定 thread (不是給整個 process)
    • 與硬體故障相關的 signals 會傳遞給造成問題的 thread
    • 其他 signals 則是傳給隨機的一個 thread

設定 thread 的 signal mask : pthread_sigmask()

// all returns 0 if OK, error number on failure
int pthread_sigmask(int how, const sigset_t *set, sigset_t *oset);

pthread_sigmask() 就像是 thread 版的 sigmask()

等待信號 : sigwait()

// all returns 0 if OK, error number on failure
int sigwait(const sigset_t *set, int *signo);

sigwait() 會 block 住,直到 set 裡面的某個信號被 delivered

  • set : 要等待的信號
  • signo : 儲存接收到的信號

注意

  • 如果 sigwait() 等待的信號有 handler,則該 handler 不會被執行,也就是我們要另外對 signo 做處理
  • 在呼叫 sigwait() 必須把 set 包含的信號 block 住 (好反直覺?)
  • sigwait() 會 atomically unblock set 裡面的信號,然後等待某個其中的信號被 delivered
    • 接著 sigwait() 會將該信號從 process 的 pending set 中移除,並復原 thread 的 signal mask
    • 也就是說 sigwait() 會從 process 取走特定的信號
  • 如果多個 threads 都對同個信號呼叫了 sigwait(),則當該信號被 delivered 時,只有一個 thread 會的 sigwait() 會返回

傳遞信號給 thread : sigkill()

// returns 0 if OK, error number on failure
int pthread_kill(pthread_t thread, int signo);

sigkill() 會向 thread 發送信號 signo

如果該信號是結束 process,則該 thread 所屬的 process 會被結束

Thread and I/O

記得用 pread()pwrite() 來避免 threads 之間的 race condition

Thread-safe

當一個函數可以同時被多個 threads 呼叫且不會發生問題時,我們說這個函數是 thread-safe
一個函數可以同時是 thread-safe 和 reentrant,也可以都不是

thread-safe but not reentrant