xenium
michael_scott_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_MICHAEL_SCOTT_QUEUE_HPP
7 #define XENIUM_MICHAEL_SCOTT_QUEUE_HPP
8 
9 #include <xenium/acquire_guard.hpp>
10 #include <xenium/backoff.hpp>
11 #include <xenium/parameter.hpp>
12 #include <xenium/policy.hpp>
13 
14 #ifdef _MSC_VER
15 #pragma warning(push)
16 #pragma warning(disable: 4324) // structure was padded due to alignment specifier
17 #endif
18 
19 namespace xenium {
36 template <class T, class... Policies>
38 public:
39  using value_type = T;
40  using reclaimer = parameter::type_param_t<policy::reclaimer, parameter::nil, Policies...>;
41  using backoff = parameter::type_param_t<policy::backoff, no_backoff, Policies...>;
42 
43  template <class... NewPolicies>
44  using with = michael_scott_queue<T, NewPolicies..., Policies...>;
45 
46  static_assert(parameter::is_set<reclaimer>::value, "reclaimer policy must be specified");
47 
50 
57  void push(T value);
58 
66  [[nodiscard]] bool try_pop(T &result);
67 
68 private:
69  struct node;
70 
71  using concurrent_ptr = typename reclaimer::template concurrent_ptr<node, 0>;
72  using marked_ptr = typename concurrent_ptr::marked_ptr;
73  using guard_ptr = typename concurrent_ptr::guard_ptr;
74 
75  struct node : reclaimer::template enable_concurrent_ptr<node>
76  {
77  node() : value() {};
78  node(T&& v) : value(std::move(v)) {}
79 
80  T value;
81  concurrent_ptr next;
82  };
83 
84  alignas(64) concurrent_ptr head;
85  alignas(64) concurrent_ptr tail;
86 };
87 
88 template <class T, class... Policies>
90 {
91  auto n = new node();
92  head.store(n, std::memory_order_relaxed);
93  tail.store(n, std::memory_order_relaxed);
94 }
95 
96 template <class T, class... Policies>
97 michael_scott_queue<T, Policies...>::~michael_scott_queue()
98 {
99  // (1) - this acquire-load synchronizes-with the release-CAS (11)
100  auto n = head.load(std::memory_order_acquire);
101  while (n)
102  {
103  // (2) - this acquire-load synchronizes-with the release-CAS (6)
104  auto next = n->next.load(std::memory_order_acquire);
105  delete n.get();
106  n = next;
107  }
108 }
109 
110 template <class T, class... Policies>
112 {
113  node* n = new node(std::move(value));
114 
115  backoff backoff;
116 
117  guard_ptr t;
118  for (;;)
119  {
120  // Get the old tail pointer.
121  // (3) - this acquire-load synchronizes-with the release-CAS (5, 7, 10)
122  t.acquire(tail, std::memory_order_acquire);
123 
124  // Help update the tail pointer if needed.
125  // (4) - this acquire-load synchronizes-with the release-CAS (6)
126  auto next = t->next.load(std::memory_order_acquire);
127  if (next.get() != nullptr)
128  {
129  marked_ptr expected(t.get());
130  // (5) - this release-CAS synchronizes-with the acquire-load (3)
131  tail.compare_exchange_weak(expected, next, std::memory_order_release, std::memory_order_relaxed);
132  continue;
133  }
134 
135  // Attempt to link in the new element.
136  marked_ptr null{};
137  // (6) - this release-CAS synchronizes-with the acquire-load (2, 4, 9).
138  if (t->next.compare_exchange_weak(null, n, std::memory_order_release, std::memory_order_relaxed))
139  break;
140 
141  backoff();
142  }
143 
144  // Swing the tail to the new element.
145  marked_ptr expected = t.get();
146  // (7) - this release-CAS synchronizes-with the acquire-load (3)
147  tail.compare_exchange_strong(expected, n, std::memory_order_release, std::memory_order_relaxed);
148 }
149 
150 template <class T, class... Policies>
152 {
153  backoff backoff;
154 
155  guard_ptr h;
156  for (;;)
157  {
158  // Get the old head and tail elements.
159  // (8) - this acquire-load synchronizes-with the release-CAS (11)
160  h.acquire(head, std::memory_order_acquire);
161 
162  // Get the head element's successor.
163  // (9) - this acquire-load synchronizes-with the release-CAS (6).
164  auto next = acquire_guard(h->next, std::memory_order_acquire);
165  if (head.load(std::memory_order_relaxed).get() != h.get())
166  continue;
167 
168  // If the head (dummy) element is the only one, return false to signal that
169  // the operation has failed (no element has been returned).
170  if (next.get() == nullptr)
171  return false;
172 
173  marked_ptr t = tail.load(std::memory_order_relaxed);
174 
175  // There are multiple elements. Help update tail if needed.
176  if (h.get() == t.get())
177  {
178  // (10) - this release-CAS synchronizes-with the acquire-load (3)
179  tail.compare_exchange_weak(t, next, std::memory_order_release, std::memory_order_relaxed);
180  continue;
181  }
182 
183  // Attempt to update the head pointer so that it points to the new dummy node.
184  marked_ptr expected(h.get());
185  // (11) - this release-CAS synchronizes-with the acquire-load (1, 8)
186  if (head.compare_exchange_weak(expected, next, std::memory_order_release, std::memory_order_relaxed))
187  {
188  // return the data of head's successor; it is the new dummy node.
189  result = std::move(next->value);
190  break;
191  }
192 
193  backoff();
194  }
195 
196  // The old dummy node has been unlinked, so reclaim it.
197  h.reclaim();
198 
199  return true;
200 }
201 }
202 
203 #ifdef _MSC_VER
204 #pragma warning(pop)
205 #endif
206 
207 #endif
An unbounded generic lock-free multi-producer/multi-consumer FIFO queue.
Definition: michael_scott_queue.hpp:37
void push(T value)
Definition: michael_scott_queue.hpp:111
bool try_pop(T &result)
Definition: michael_scott_queue.hpp:151
Dummy backoff strategy that does nothing.
Definition: backoff.hpp:17
Policy to configure the backoff strategy.
Definition: policy.hpp:39
Policy to configure the reclamation scheme to be used.
Definition: policy.hpp:25