Halide  13.0.1
Halide compiler and libraries
synchronization_common.h
Go to the documentation of this file.
1 #include "HalideRuntime.h"
2 #include "printer.h"
3 #include "scoped_spin_lock.h"
4 
5 /* This provides an implementation of pthreads-like mutex and
6  * condition variables with fast default case performance. The code
7  * is based on the "parking lot" design and specifically Amanieu
8  * d'Antras' Rust implementation:
9  * https://github.com/Amanieu/parking_lot
10  * and the one in WTF:
11  * https://webkit.org/blog/6161/locking-in-webkit/
12  *
13  * Neither of the above implementations were used directly largely for
14  * dependency management. This implementation lacks a few features
15  * relative to those two. Specifically timeouts are not
16  * supported. Nor is optional fairness or deadlock detection.
17  * This implementation should provide a faily standalone "one file"
18  * fast synchronization layer on top of readily available system primitives.
19  *
20  * TODO: Implement pthread_once equivalent.
21  * TODO: Add read/write lock and move SharedExclusiveSpinLock from tracing.cpp
22  * to this mechanism.
23  * TODO: Add timeouts and optional fairness if needed.
24  * TODO: Relying on condition variables has issues for old versions of Windows
25  * and likely has portability issues to some very bare bones embedded OSes.
26  * Doing an implementation using only semaphores or event counters should
27  * be doable.
28  */
29 
30 // Copied from tsan_interface.h
31 #ifndef TSAN_ANNOTATIONS
32 #define TSAN_ANNOTATIONS 0
33 #endif
34 
35 #if TSAN_ANNOTATIONS
36 extern "C" {
37 const unsigned __tsan_mutex_linker_init = 1 << 0;
38 void __tsan_mutex_pre_lock(void *addr, unsigned flags);
39 void __tsan_mutex_post_lock(void *addr, unsigned flags, int recursion);
40 int __tsan_mutex_pre_unlock(void *addr, unsigned flags);
41 void __tsan_mutex_post_unlock(void *addr, unsigned flags);
42 void __tsan_mutex_pre_signal(void *addr, unsigned flags);
43 void __tsan_mutex_post_signal(void *addr, unsigned flags);
44 }
45 #endif
46 
47 namespace Halide {
48 namespace Runtime {
49 namespace Internal {
50 
51 namespace Synchronization {
52 
53 namespace {
54 
55 #if TSAN_ANNOTATIONS
56 ALWAYS_INLINE void if_tsan_pre_lock(void *mutex) {
57  __tsan_mutex_pre_lock(mutex, __tsan_mutex_linker_init);
58 };
59 // TODO(zalman|dvyukov): Is 1 the right value for a non-recursive lock? pretty sure value is ignored.
60 ALWAYS_INLINE void if_tsan_post_lock(void *mutex) {
61  __tsan_mutex_post_lock(mutex, __tsan_mutex_linker_init, 1);
62 }
63 // TODO(zalman|dvyukov): Is it safe to ignore return value here if locks are not recursive?
64 ALWAYS_INLINE void if_tsan_pre_unlock(void *mutex) {
65  (void)__tsan_mutex_pre_unlock(mutex, __tsan_mutex_linker_init);
66 }
67 ALWAYS_INLINE void if_tsan_post_unlock(void *mutex) {
68  __tsan_mutex_post_unlock(mutex, __tsan_mutex_linker_init);
69 }
70 ALWAYS_INLINE void if_tsan_pre_signal(void *cond) {
71  __tsan_mutex_pre_signal(cond, 0);
72 }
73 ALWAYS_INLINE void if_tsan_post_signal(void *cond) {
74  __tsan_mutex_post_signal(cond, 0);
75 }
76 #else
77 ALWAYS_INLINE void if_tsan_pre_lock(void *) {
78 }
79 ALWAYS_INLINE void if_tsan_post_lock(void *) {
80 }
81 ALWAYS_INLINE void if_tsan_pre_unlock(void *) {
82 }
83 ALWAYS_INLINE void if_tsan_post_unlock(void *) {
84 }
85 ALWAYS_INLINE void if_tsan_pre_signal(void *) {
86 }
87 ALWAYS_INLINE void if_tsan_post_signal(void *) {
88 }
89 #endif
90 
91 #ifdef BITS_32
92 ALWAYS_INLINE uintptr_t atomic_and_fetch_release(uintptr_t *addr, uintptr_t val) {
93  return __sync_and_and_fetch(addr, val);
94 }
95 
96 template<typename T>
97 ALWAYS_INLINE T atomic_fetch_add_acquire_release(T *addr, T val) {
98  return __sync_fetch_and_add(addr, val);
99 }
100 
101 template<typename T>
102 ALWAYS_INLINE bool cas_strong_sequentially_consistent_helper(T *addr, T *expected, T *desired) {
103  T oldval = *expected;
104  T gotval = __sync_val_compare_and_swap(addr, oldval, *desired);
105  *expected = gotval;
106  return oldval == gotval;
107 }
108 
109 ALWAYS_INLINE bool atomic_cas_strong_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
110  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
111 }
112 
113 ALWAYS_INLINE bool atomic_cas_weak_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
114  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
115 }
116 
117 template<typename T>
118 ALWAYS_INLINE bool atomic_cas_weak_relacq_relaxed(T *addr, T *expected, T *desired) {
119  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
120 }
121 
122 ALWAYS_INLINE bool atomic_cas_weak_relaxed_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
123  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
124 }
125 
126 ALWAYS_INLINE bool atomic_cas_weak_acquire_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
127  return cas_strong_sequentially_consistent_helper(addr, expected, desired);
128 }
129 
130 ALWAYS_INLINE uintptr_t atomic_fetch_and_release(uintptr_t *addr, uintptr_t val) {
131  return __sync_fetch_and_and(addr, val);
132 }
133 
134 template<typename T>
135 ALWAYS_INLINE void atomic_load_relaxed(T *addr, T *val) {
136  *val = *addr;
137 }
138 
139 template<typename T>
140 ALWAYS_INLINE void atomic_load_acquire(T *addr, T *val) {
141  __sync_synchronize();
142  *val = *addr;
143 }
144 
145 ALWAYS_INLINE uintptr_t atomic_or_fetch_relaxed(uintptr_t *addr, uintptr_t val) {
146  return __sync_or_and_fetch(addr, val);
147 }
148 
149 ALWAYS_INLINE void atomic_store_relaxed(uintptr_t *addr, uintptr_t *val) {
150  *addr = *val;
151 }
152 
153 template<typename T>
154 ALWAYS_INLINE void atomic_store_release(T *addr, T *val) {
155  *addr = *val;
156  __sync_synchronize();
157 }
158 
159 ALWAYS_INLINE void atomic_thread_fence_acquire() {
160  __sync_synchronize();
161 }
162 
163 #else
164 
165 ALWAYS_INLINE uintptr_t atomic_and_fetch_release(uintptr_t *addr, uintptr_t val) {
166  return __atomic_and_fetch(addr, val, __ATOMIC_RELEASE);
167 }
168 
169 template<typename T>
170 ALWAYS_INLINE T atomic_fetch_add_acquire_release(T *addr, T val) {
171  return __atomic_fetch_add(addr, val, __ATOMIC_ACQ_REL);
172 }
173 
174 ALWAYS_INLINE bool atomic_cas_strong_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
175  return __atomic_compare_exchange(addr, expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
176 }
177 
178 template<typename T>
179 ALWAYS_INLINE bool atomic_cas_weak_relacq_relaxed(T *addr, T *expected, T *desired) {
180  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
181 }
182 
183 ALWAYS_INLINE bool atomic_cas_weak_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
184  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
185 }
186 
187 ALWAYS_INLINE bool atomic_cas_weak_relaxed_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
188  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
189 }
190 
191 ALWAYS_INLINE bool atomic_cas_weak_acquire_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
192  return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
193 }
194 
195 ALWAYS_INLINE uintptr_t atomic_fetch_and_release(uintptr_t *addr, uintptr_t val) {
196  return __atomic_fetch_and(addr, val, __ATOMIC_RELEASE);
197 }
198 
199 template<typename T>
200 ALWAYS_INLINE void atomic_load_relaxed(T *addr, T *val) {
201  __atomic_load(addr, val, __ATOMIC_RELAXED);
202 }
203 
204 template<typename T>
205 ALWAYS_INLINE void atomic_load_acquire(T *addr, T *val) {
206  __atomic_load(addr, val, __ATOMIC_ACQUIRE);
207 }
208 
209 ALWAYS_INLINE uintptr_t atomic_or_fetch_relaxed(uintptr_t *addr, uintptr_t val) {
210  return __atomic_or_fetch(addr, val, __ATOMIC_RELAXED);
211 }
212 
213 ALWAYS_INLINE void atomic_store_relaxed(uintptr_t *addr, uintptr_t *val) {
214  __atomic_store(addr, val, __ATOMIC_RELAXED);
215 }
216 
217 template<typename T>
218 ALWAYS_INLINE void atomic_store_release(T *addr, T *val) {
219  __atomic_store(addr, val, __ATOMIC_RELEASE);
220 }
221 
222 ALWAYS_INLINE void atomic_thread_fence_acquire() {
223  __atomic_thread_fence(__ATOMIC_ACQUIRE);
224 }
225 
226 #endif
227 
228 } // namespace
229 
231  // Everyone says this should be 40. Have not measured it.
232  int spin_count = 40;
233 
234 public:
236  if (spin_count > 0) {
237  spin_count--;
238  }
239  return spin_count > 0;
240  }
241 
243  spin_count = 40;
244  }
245 };
246 
247 // Low order two bits are used for locking state,
248 static constexpr uint8_t lock_bit = 0x01;
249 static constexpr uint8_t queue_lock_bit = 0x02;
250 static constexpr uint8_t parked_bit = 0x02;
251 
253  thread_parker parker; // TODO: member or pointer?
254 
255  // This design is from the Rust parking lot implementation by Amanieu d'Antras.
256  // Comment from original:
257  //
258  // Linked list of threads in the queue. The queue is split into two parts:
259  // the processed part and the unprocessed part. When new nodes are added to
260  // the list, they only have the next pointer set, and queue_tail is null.
261  //
262  // Nodes are processed with the queue lock held, which consists of setting
263  // the prev pointer for each node and setting the queue_tail pointer on the
264  // first processed node of the list.
265  //
266  // This setup allows nodes to be added to the queue without a lock, while
267  // still allowing O(1) removal of nodes from the processed part of the list.
268  // The only cost is the O(n) processing, but this only needs to be done
269  // once for each node, and therefore isn't too expensive.
270 
274 };
275 
276 class word_lock {
277  uintptr_t state = 0;
278 
279  void lock_full();
280  void unlock_full();
281 
282 public:
284  if_tsan_pre_lock(this);
285 
286  uintptr_t expected = 0;
287  uintptr_t desired = lock_bit;
288  // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
289  if (!atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
290  lock_full();
291  }
292 
293  if_tsan_post_lock(this);
294  }
295 
297  if_tsan_pre_unlock(this);
298 
299  uintptr_t val = atomic_fetch_and_release(&state, ~(uintptr_t)lock_bit);
300  // If another thread is currently queueing, that thread will ensure
301  // it acquires the lock or wakes a waiting thread.
302  bool no_thread_queuing = (val & queue_lock_bit) == 0;
303  // Only need to do a wakeup if there are threads waiting.
304  bool some_queued = (val & ~(uintptr_t)(queue_lock_bit | lock_bit)) != 0;
305  if (no_thread_queuing && some_queued) {
306  unlock_full();
307  }
308 
309  if_tsan_post_unlock(this);
310  }
311 };
312 
313 WEAK void word_lock::lock_full() {
314  spin_control spinner;
315  uintptr_t expected;
316  atomic_load_relaxed(&state, &expected);
317 
318  while (true) {
319  if (!(expected & lock_bit)) {
320  uintptr_t desired = expected | lock_bit;
321 
322  if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
323  return;
324  }
325  continue;
326  }
327 
328  if (((expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) != 0) && spinner.should_spin()) {
330  atomic_load_relaxed(&state, &expected);
331  continue;
332  }
333 
334  word_lock_queue_data node;
335 
336  node.parker.prepare_park();
337  // TODO set up prelinkage parking state
338 
339  word_lock_queue_data *head = (word_lock_queue_data *)(expected & ~(uintptr_t)(queue_lock_bit | lock_bit));
340  if (head == nullptr) {
341  node.tail = &node;
342  // constructor set node.prev = nullptr;
343  } else {
344  // Mark the tail as nullptr. The unlock routine will walk the list and wakeup
345  // the thread at the end.
346  // constructor set node.tail = nullptr;
347  // constructor set node.prev = nullptr;
348  node.next = head;
349  }
350 
351  uintptr_t desired = ((uintptr_t)&node) | (expected & (queue_lock_bit | lock_bit));
352  if (atomic_cas_weak_release_relaxed(&state, &expected, &desired)) {
353  node.parker.park();
354  spinner.reset();
355  atomic_load_relaxed(&state, &expected);
356  }
357  }
358 }
359 
360 WEAK void word_lock::unlock_full() {
361  uintptr_t expected;
362  atomic_load_relaxed(&state, &expected);
363 
364  while (true) {
365  // If another thread is currently queueing, that thread will ensure
366  // it acquires the lock or wakes a waiting thread.
367  bool thread_queuing = (expected & queue_lock_bit);
368  // Only need to do a wakeup if there are threads waiting.
369  bool none_queued = (expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) == 0;
370  if (thread_queuing || none_queued) {
371  return;
372  }
373 
374  uintptr_t desired = expected | queue_lock_bit;
375  if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
376  break;
377  }
378  }
379 
380  while (true) {
381  word_lock_queue_data *head = (word_lock_queue_data *)(expected & ~(uintptr_t)(queue_lock_bit | lock_bit));
382  word_lock_queue_data *current = head;
383  word_lock_queue_data *tail = current->tail;
384  int times_through = 0;
385  while (tail == nullptr) {
386  word_lock_queue_data *next = current->next;
387  halide_assert(nullptr, next != nullptr);
388  next->prev = current;
389  current = next;
390  tail = current->tail;
391  times_through++;
392  }
393  head->tail = tail;
394 
395  // If the lock is now locked, unlock the queue and have the thread
396  // that currently holds the lock do the wakeup
397  if (expected & lock_bit) {
398  uintptr_t desired = expected & ~(uintptr_t)queue_lock_bit;
399  if (atomic_cas_weak_relacq_relaxed(&state, &expected, &desired)) {
400  return;
401  }
402  atomic_thread_fence_acquire();
403  continue;
404  }
405 
406  word_lock_queue_data *new_tail = tail->prev;
407  if (new_tail == nullptr) {
408  bool continue_outer = false;
409  while (!continue_outer) {
410  uintptr_t desired = expected & lock_bit;
411  if (atomic_cas_weak_relacq_relaxed(&state, &expected, &desired)) {
412  break;
413  }
414  if ((expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) == 0) {
415  continue;
416  } else {
417  atomic_thread_fence_acquire();
418  continue_outer = true;
419  }
420  }
421 
422  if (continue_outer) {
423  continue;
424  }
425  } else {
426  head->tail = new_tail;
427  atomic_and_fetch_release(&state, ~(uintptr_t)queue_lock_bit);
428  }
429 
430  // TODO: The reason there are three calls here is other things can happen between them.
431  // Also it is not clear if unpark_start has to return the mutex/flag used by unpark
432  // and unpark_finish due to memory lifetime reasons.
433  tail->parker.unpark_start();
434  tail->parker.unpark();
435  tail->parker.unpark_finish();
436  break;
437  }
438 }
439 
440 struct queue_data {
441  thread_parker parker; // TODO: member or pointer?
442 
443  uintptr_t sleep_address = 0;
444  queue_data *next = nullptr;
445  uintptr_t unpark_info = 0;
446 };
447 
448 // Must be a power of two.
449 constexpr int LOAD_FACTOR = 4;
450 
451 struct hash_bucket {
453 
454  queue_data *head = nullptr; // Is this queue_data or thread_data?
455  queue_data *tail = nullptr; // Is this queue_data or thread_data?
456 };
457 
458 constexpr int HASH_TABLE_SIZE = MAX_THREADS * LOAD_FACTOR;
459 struct hash_table {
461 };
463 
464 constexpr int HASH_TABLE_BITS = 10;
465 static_assert((1 << HASH_TABLE_BITS) >= MAX_THREADS * LOAD_FACTOR);
466 
467 #if 0
468 WEAK void dump_hash() {
469  int i = 0;
470  for (auto &bucket : table.buckets) {
471  queue_data *head = bucket.head;
472  while (head != nullptr) {
473  print(nullptr) << "Bucket index " << i << " addr " << (void *)head->sleep_address << "\n";
474  head = head->next;
475  }
476  i++;
477  }
478 }
479 #endif
480 
481 static ALWAYS_INLINE uintptr_t addr_hash(uintptr_t addr) {
482  // Fibonacci hashing. The golden ratio is 1.9E3779B97F4A7C15F39...
483  // in hexadecimal.
484  if (sizeof(uintptr_t) >= 8) {
485  return (addr * (uintptr_t)0x9E3779B97F4A7C15) >> (64 - HASH_TABLE_BITS);
486  } else {
487  return (addr * (uintptr_t)0x9E3779B9) >> (32 - HASH_TABLE_BITS);
488  }
489 }
490 
491 #ifdef DEBUG_RUNTIME
492 // Any hash calculated by addr_hash() should be incapable of being outside this range.
493 ALWAYS_INLINE void check_hash(uintptr_t hash) {
494  halide_assert(nullptr, hash < HASH_TABLE_SIZE);
495 }
496 #endif // DEBUG_RUNTIME
497 
498 WEAK hash_bucket &lock_bucket(uintptr_t addr) {
499  uintptr_t hash = addr_hash(addr);
500 
501 #ifdef DEBUG_RUNTIME
502  check_hash(hash);
503 #endif
504 
505  // TODO: if resizing is implemented, loop, etc.
506  hash_bucket &bucket = table.buckets[hash];
507 
508  bucket.mutex.lock();
509 
510  return bucket;
511 }
512 
513 struct bucket_pair {
516 
518  : from(from), to(to) {
519  }
520 };
521 
522 WEAK bucket_pair lock_bucket_pair(uintptr_t addr_from, uintptr_t addr_to) {
523  // TODO: if resizing is implemented, loop, etc.
524  uintptr_t hash_from = addr_hash(addr_from);
525  uintptr_t hash_to = addr_hash(addr_to);
526 
527 #ifdef DEBUG_RUNTIME
528  check_hash(hash_from);
529  check_hash(hash_to);
530 #endif
531 
532  // Lock the bucket with the smaller hash first in order
533  // to prevent deadlock.
534  if (hash_from == hash_to) {
535  hash_bucket &first = table.buckets[hash_from];
536  first.mutex.lock();
537  return bucket_pair(first, first);
538  } else if (hash_from < hash_to) {
539  hash_bucket &first = table.buckets[hash_from];
540  hash_bucket &second = table.buckets[hash_to];
541  first.mutex.lock();
542  second.mutex.lock();
543  return bucket_pair(first, second);
544  } else {
545  hash_bucket &first = table.buckets[hash_to];
546  hash_bucket &second = table.buckets[hash_from];
547  first.mutex.lock();
548  second.mutex.lock();
549  return bucket_pair(second, first);
550  }
551 }
552 
554  // In the lock routine, the buckets are locked smaller hash index first.
555  // Here we reverse this ordering by comparing the pointers. This works
556  // since the pointers are obtained by indexing an array with the hash
557  // values.
558  if (&buckets.from == &buckets.to) {
559  buckets.from.mutex.unlock();
560  } else if (&buckets.from > &buckets.to) {
561  buckets.from.mutex.unlock();
562  buckets.to.mutex.unlock();
563  } else {
564  buckets.to.mutex.unlock();
565  buckets.from.mutex.unlock();
566  }
567 }
568 
570  bool unpark_one = false;
571  uintptr_t invalid_unpark_info = 0;
572 };
573 
575  uintptr_t park(uintptr_t addr);
576  uintptr_t unpark_one(uintptr_t addr);
577  int unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info);
578 
579 protected:
580  virtual bool validate(validate_action &action) {
581  return true;
582  }
583  virtual void before_sleep() {
584  // nothing
585  }
586  virtual uintptr_t unpark(int unparked, bool more_waiters) {
587  return 0;
588  }
589  virtual void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) {
590  // nothing
591  }
592 };
593 
594 // TODO: Do we need a park_result thing here?
595 WEAK uintptr_t parking_control::park(uintptr_t addr) {
597 
598  hash_bucket &bucket = lock_bucket(addr);
599 
600  validate_action action;
601  if (!validate(action)) {
602  bucket.mutex.unlock();
603  return action.invalid_unpark_info;
604  }
605 
606  queue_data.next = nullptr;
607  queue_data.sleep_address = addr;
608  queue_data.parker.prepare_park();
609  if (bucket.head != nullptr) {
610  bucket.tail->next = &queue_data;
611  } else {
612  bucket.head = &queue_data;
613  }
614  bucket.tail = &queue_data;
615  bucket.mutex.unlock();
616 
617  before_sleep();
618 
619  queue_data.parker.park();
620 
621  return queue_data.unpark_info;
622 
623  // TODO: handling timeout.
624 }
625 
626 WEAK uintptr_t parking_control::unpark_one(uintptr_t addr) {
627  hash_bucket &bucket = lock_bucket(addr);
628 
629  queue_data **data_location = &bucket.head;
630  queue_data *prev = nullptr;
631  queue_data *data = *data_location;
632  while (data != nullptr) {
633  uintptr_t cur_addr;
634  atomic_load_relaxed(&data->sleep_address, &cur_addr);
635  if (cur_addr == addr) {
636  queue_data *next = data->next;
637  *data_location = next;
638 
639  bool more_waiters = false;
640 
641  if (bucket.tail == data) {
642  bucket.tail = prev;
643  } else {
644  queue_data *data2 = next;
645  while (data2 != nullptr && !more_waiters) {
646  uintptr_t cur_addr2;
647  atomic_load_relaxed(&data2->sleep_address, &cur_addr2);
648  more_waiters = (cur_addr2 == addr);
649  data2 = data2->next;
650  }
651  }
652 
653  data->unpark_info = unpark(1, more_waiters);
654 
655  data->parker.unpark_start();
656  bucket.mutex.unlock();
657  data->parker.unpark();
658  data->parker.unpark_finish();
659 
660  // TODO: Figure out ret type.
661  return more_waiters ? 1 : 0;
662  } else {
663  data_location = &data->next;
664  prev = data;
665  data = data->next;
666  }
667  }
668 
669  unpark(0, false);
670 
671  bucket.mutex.unlock();
672 
673  // TODO: decide if this is the right return value.
674  return 0;
675 }
676 
677 WEAK int parking_control::unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info) {
678  bucket_pair buckets = lock_bucket_pair(addr_from, addr_to);
679 
680  validate_action action;
681  if (!validate(action)) {
682  unlock_bucket_pair(buckets);
683  return 0;
684  }
685 
686  queue_data **data_location = &buckets.from.head;
687  queue_data *prev = nullptr;
688  queue_data *data = *data_location;
689  queue_data *requeue = nullptr;
690  queue_data *requeue_tail = nullptr;
691  queue_data *wakeup = nullptr;
692 
693  while (data != nullptr) {
694  uintptr_t cur_addr;
695  atomic_load_relaxed(&data->sleep_address, &cur_addr);
696 
697  queue_data *next = data->next;
698  if (cur_addr == addr_from) {
699  *data_location = next;
700 
701  if (buckets.from.tail == data) {
702  buckets.from.tail = prev;
703  }
704 
705  if (action.unpark_one && wakeup == nullptr) {
706  wakeup = data;
707  } else {
708  if (requeue == nullptr) {
709  requeue = data;
710  } else {
711  requeue_tail->next = data;
712  }
713 
714  requeue_tail = data;
715  atomic_store_relaxed(&data->sleep_address, &addr_to);
716  }
717  data = next;
718  // TODO: prev ptr?
719  } else {
720  data_location = &data->next;
721  prev = data;
722  data = next;
723  }
724  }
725 
726  if (requeue != nullptr) {
727  requeue_tail->next = nullptr;
728  if (buckets.to.head == nullptr) {
729  buckets.to.head = requeue;
730  } else {
731  buckets.to.tail->next = requeue;
732  }
733  buckets.to.tail = requeue_tail;
734  }
735 
736  requeue_callback(action, wakeup != nullptr, requeue != nullptr);
737 
738  if (wakeup != nullptr) {
739  wakeup->unpark_info = unpark_info;
740  wakeup->parker.unpark_start();
741  unlock_bucket_pair(buckets);
742  wakeup->parker.unpark();
743  wakeup->parker.unpark_finish();
744  } else {
745  unlock_bucket_pair(buckets);
746  }
747 
748  return wakeup != nullptr && action.unpark_one;
749 }
750 
751 struct mutex_parking_control final : public parking_control {
752  uintptr_t *const lock_state;
753 
756  }
757 
758 protected:
759  bool validate(validate_action &action) final {
760  uintptr_t result;
761  atomic_load_relaxed(lock_state, &result);
762  return result == (lock_bit | parked_bit);
763  }
764 
765  uintptr_t unpark(int unparked, bool more_waiters) final {
766  // TODO: consider handling fairness.
767  uintptr_t return_state = more_waiters ? parked_bit : 0;
768  atomic_store_release(lock_state, &return_state);
769  return 0;
770  }
771 };
772 
773 class fast_mutex {
774  uintptr_t state = 0;
775 
776  ALWAYS_INLINE void lock_full() {
777  // Everyone says this should be 40. Have not measured it.
778  spin_control spinner;
779  uintptr_t expected;
780  atomic_load_relaxed(&state, &expected);
781 
782  while (true) {
783  if (!(expected & lock_bit)) {
784  uintptr_t desired = expected | lock_bit;
785  if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
786  return;
787  }
788  continue;
789  }
790 
791  // Spin with spin count. Note that this occurs even if
792  // threads are parked. We're prioritizing throughput over
793  // fairness by letting sleeping threads lie.
794  if (spinner.should_spin()) {
796  atomic_load_relaxed(&state, &expected);
797  continue;
798  }
799 
800  // Mark mutex as having parked threads if not already done.
801  if ((expected & parked_bit) == 0) {
802  uintptr_t desired = expected | parked_bit;
803  if (!atomic_cas_weak_relaxed_relaxed(&state, &expected, &desired)) {
804  continue;
805  }
806  }
807 
808  // TODO: consider handling fairness, timeout
809  mutex_parking_control control(&state);
810  uintptr_t result = control.park((uintptr_t)this);
811  if (result == (uintptr_t)this) {
812  return;
813  }
814 
815  spinner.reset();
816  atomic_load_relaxed(&state, &expected);
817  }
818  }
819 
820  ALWAYS_INLINE void unlock_full() {
821  uintptr_t expected = lock_bit;
822  uintptr_t desired = 0;
823  // Try for a fast release of the lock. Redundant with code in lock, but done
824  // to make unlock_full a standalone unlock that can be called directly.
825  if (atomic_cas_strong_release_relaxed(&state, &expected, &desired)) {
826  return;
827  }
828 
829  mutex_parking_control control(&state);
830  control.unpark_one((uintptr_t)this);
831  }
832 
833 public:
835  uintptr_t expected = 0;
836  uintptr_t desired = lock_bit;
837  // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
838  if (!atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
839  lock_full();
840  }
841  }
842 
844  uintptr_t expected = lock_bit;
845  uintptr_t desired = 0;
846  // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
847  if (!atomic_cas_weak_release_relaxed(&state, &expected, &desired)) {
848  unlock_full();
849  }
850  }
851 
853  uintptr_t val;
854  atomic_load_relaxed(&state, &val);
855  while (true) {
856  if (!(val & lock_bit)) {
857  return false;
858  }
859 
860  uintptr_t desired = val | parked_bit;
861  if (atomic_cas_weak_relaxed_relaxed(&state, &val, &desired)) {
862  return true;
863  }
864  }
865  }
866 
868  atomic_or_fetch_relaxed(&state, parked_bit);
869  }
870 };
871 
873  uintptr_t *const cond_state;
875 
878  }
879 
880 protected:
881  uintptr_t unpark(int unparked, bool more_waiters) final {
882  if (!more_waiters) {
883  uintptr_t val = 0;
884  atomic_store_relaxed(cond_state, &val);
885  }
886 
887 #if 0 // TODO: figure out why this was here.
888  return (uintptr_t)mutex;
889 #else
890  return 0;
891 #endif
892  }
893 };
894 
896  uintptr_t *const cond_state;
898 
901  }
902 
903 protected:
904  bool validate(validate_action &action) final {
905  uintptr_t val;
906  atomic_load_relaxed(cond_state, &val);
907  // By the time this broadcast locked everything and was processed, the cond
908  // has progressed to a new mutex, do nothing since any waiting threads have
909  // to be waiting on what is effectively a different condition.
910  if (val != (uintptr_t)mutex) {
911  return false;
912  }
913  // Clear the cond's connection to the mutex as all waiting threads are going to reque onto the mutex.
914  val = 0;
915  atomic_store_relaxed(cond_state, &val);
916  action.unpark_one = !mutex->make_parked_if_locked();
917  return true;
918  }
919 
920  void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) final {
921  if (action.unpark_one && some_requeued) {
922  mutex->make_parked();
923  }
924  }
925 };
926 
927 struct wait_parking_control final : public parking_control {
928  uintptr_t *const cond_state;
930 
933  }
934 
935 protected:
936  bool validate(validate_action &action) final {
937  uintptr_t val;
938  atomic_load_relaxed(cond_state, &val);
939 
940  if (val == 0) {
941  val = (uintptr_t)mutex;
942  atomic_store_relaxed(cond_state, &val);
943  } else if (val != (uintptr_t)mutex) {
944  // TODO: signal error.
945  action.invalid_unpark_info = (uintptr_t)mutex;
946  return false;
947  }
948 
949  return true;
950  }
951 
952  void before_sleep() final {
953  mutex->unlock();
954  }
955 
956  uintptr_t unpark(int unparked, bool more_waiters) final {
957  if (!more_waiters) {
958  uintptr_t val = 0;
959  atomic_store_relaxed(cond_state, &val);
960  }
961  return 0;
962  }
963 };
964 
965 class fast_cond {
966  uintptr_t state = 0;
967 
968 public:
970  if_tsan_pre_signal(this);
971 
972  uintptr_t val;
973  atomic_load_relaxed(&state, &val);
974  if (val == 0) {
975  if_tsan_post_signal(this);
976  return;
977  }
978  signal_parking_control control(&state, (fast_mutex *)val);
979  control.unpark_one((uintptr_t)this);
980  if_tsan_post_signal(this);
981  }
982 
984  if_tsan_pre_signal(this);
985  uintptr_t val;
986  atomic_load_relaxed(&state, &val);
987  if (val == 0) {
988  if_tsan_post_signal(this);
989  return;
990  }
991  broadcast_parking_control control(&state, (fast_mutex *)val);
992  control.unpark_requeue((uintptr_t)this, val, 0);
993  if_tsan_post_signal(this);
994  }
995 
997  wait_parking_control control(&state, mutex);
998  uintptr_t result = control.park((uintptr_t)this);
999  if (result != (uintptr_t)mutex) {
1000  mutex->lock();
1001  } else {
1002  if_tsan_pre_lock(mutex);
1003 
1004  // TODO: this is debug only.
1005  uintptr_t val;
1006  atomic_load_relaxed((uintptr_t *)mutex, &val);
1007  halide_assert(nullptr, val & 0x1);
1008 
1009  if_tsan_post_lock(mutex);
1010  }
1011  }
1012 };
1013 
1014 } // namespace Synchronization
1015 
1016 } // namespace Internal
1017 } // namespace Runtime
1018 } // namespace Halide
1019 
1020 extern "C" {
1021 
1025  fast_mutex->lock();
1026 }
1027 
1031  fast_mutex->unlock();
1032 }
1033 
1037  fast_cond->broadcast();
1038 }
1039 
1043  fast_cond->signal();
1044 }
1045 
1046 WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex) {
1051  fast_cond->wait(fast_mutex);
1052 }
1053 
1054 // Actual definition of the mutex array.
1057 };
1058 
1060  // TODO: If sz is huge, we should probably hash it down to something smaller
1061  // in the accessors below. Check for deadlocks before doing so.
1063  nullptr, sizeof(halide_mutex_array));
1064  if (array == nullptr) {
1065  // Will result in a failed assertion and a call to halide_error.
1066  return nullptr;
1067  }
1068  array->array = (halide_mutex *)halide_malloc(
1069  nullptr, sz * sizeof(halide_mutex));
1070  if (array->array == nullptr) {
1071  halide_free(nullptr, array);
1072  // Will result in a failed assertion and a call to halide_error.
1073  return nullptr;
1074  }
1075  memset(array->array, 0, sz * sizeof(halide_mutex));
1076  return array;
1077 }
1078 
1080  struct halide_mutex_array *arr_ptr = (struct halide_mutex_array *)array;
1081  halide_free(user_context, arr_ptr->array);
1082  halide_free(user_context, arr_ptr);
1083 }
1084 
1086  halide_mutex_lock(&array->array[entry]);
1087  return 0;
1088 }
1089 
1091  halide_mutex_unlock(&array->array[entry]);
1092  return 0;
1093 }
1094 }
This file declares the routines used by Halide internally in its runtime.
void halide_free(void *user_context, void *ptr)
void * halide_malloc(void *user_context, size_t x)
Halide calls these functions to allocate and free memory.
WEAK void unlock_bucket_pair(bucket_pair &buckets)
WEAK hash_bucket & lock_bucket(uintptr_t addr)
WEAK bucket_pair lock_bucket_pair(uintptr_t addr_from, uintptr_t addr_to)
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
@ Internal
Not visible externally, similar to 'static' linkage in C.
Expr print(const std::vector< Expr > &values)
Create an Expr that prints out its value whenever it is evaluated.
void * user_context
Definition: printer.h:33
unsigned __INT8_TYPE__ uint8_t
void * memset(void *s, int val, size_t n)
void halide_thread_yield()
#define ALWAYS_INLINE
#define halide_assert(user_context, cond)
#define WEAK
ALWAYS_INLINE broadcast_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) final
ALWAYS_INLINE bucket_pair(hash_bucket &from, hash_bucket &to)
virtual uintptr_t unpark(int unparked, bool more_waiters)
virtual void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued)
int unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info)
ALWAYS_INLINE signal_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
uintptr_t unpark(int unparked, bool more_waiters) final
ALWAYS_INLINE wait_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
Cross platform condition variable.
struct halide_mutex * array
Cross-platform mutex.
WEAK int halide_mutex_array_lock(struct halide_mutex_array *array, int entry)
WEAK void halide_mutex_array_destroy(void *user_context, void *array)
WEAK void halide_mutex_unlock(halide_mutex *mutex)
WEAK halide_mutex_array * halide_mutex_array_create(int sz)
WEAK void halide_cond_signal(struct halide_cond *cond)
WEAK int halide_mutex_array_unlock(struct halide_mutex_array *array, int entry)
WEAK void halide_mutex_lock(halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
WEAK void halide_cond_broadcast(struct halide_cond *cond)