36 #include <sys/types.h> 37 #include <sys/socket.h> 39 #include <sys/ioctl.h> 40 #include <netinet/in.h> 48 #include <arpa/inet.h> 55 #include <qb/qbipc_common.h> 63 #define MESSAGE_REQ_SYNC_BARRIER 0 64 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1 65 #define MESSAGE_REQ_SYNC_MEMB_DETERMINE 2 82 const unsigned int *trans_list,
83 size_t trans_list_entries,
84 const unsigned int *member_list,
85 size_t member_list_entries,
100 struct qb_ipc_request_header header __attribute__((aligned(8)));
101 struct memb_ring_id ring_id __attribute__((aligned(8)));
105 struct qb_ipc_request_header header __attribute__((aligned(8)));
106 struct memb_ring_id ring_id __attribute__((aligned(8)));
112 struct qb_ipc_request_header header __attribute__((aligned(8)));
113 struct memb_ring_id ring_id __attribute__((aligned(8)));
122 static int my_memb_determine = 0;
126 static unsigned int my_memb_determine_list_entries = 0;
128 static int my_processing_idx = 0;
138 static size_t my_member_list_entries = 0;
140 static size_t my_trans_list_entries = 0;
142 static int my_processor_list_entries = 0;
144 static struct service_entry my_service_list[SERVICES_COUNT_MAX];
146 static int my_service_list_entries = 0;
148 static void (*sync_synchronization_completed) (void);
150 static void sync_deliver_fn (
153 unsigned int msg_len,
154 int endian_conversion_required);
156 static int schedwrk_processor (
const void *context);
158 static void sync_process_enter (
void);
165 static void *sync_group_handle;
172 int (*sync_callbacks_retrieve) (
175 void (*synchronization_completed) (
void))
185 "Couldn't initialize groups interface.");
198 sync_synchronization_completed = synchronization_completed;
204 static void sync_barrier_handler (
unsigned int nodeid,
const void *msg)
208 int barrier_reached = 1;
210 if (memcmp (&my_ring_id, &req_exec_barrier_message->ring_id,
216 for (i = 0; i < my_processor_list_entries; i++) {
217 if (my_processor_list[i].nodeid == nodeid) {
221 for (i = 0; i < my_processor_list_entries; i++) {
222 if (my_processor_list[i].received == 0) {
226 if (barrier_reached) {
228 my_service_list[my_processing_idx].
name);
235 my_processing_idx += 1;
236 if (my_service_list_entries == my_processing_idx) {
237 my_memb_determine_list_entries = 0;
238 sync_synchronization_completed ();
240 sync_process_enter ();
245 static void dummy_sync_init (
246 const unsigned int *trans_list,
247 size_t trans_list_entries,
248 const unsigned int *member_list,
249 size_t member_list_entries,
254 static void dummy_sync_abort (
void)
258 static int dummy_sync_process (
void)
263 static void dummy_sync_activate (
void)
267 static int service_entry_compare (
const void *a,
const void *b)
275 static void sync_memb_determine (
unsigned int nodeid,
const void *msg)
281 if (memcmp (&req_exec_memb_determine_message->ring_id,
282 &my_memb_determine_ring_id, sizeof (
struct memb_ring_id)) != 0) {
288 my_memb_determine = 1;
289 for (i = 0; i < my_memb_determine_list_entries; i++) {
290 if (my_memb_determine_list[i] == nodeid) {
295 my_memb_determine_list[my_memb_determine_list_entries] =
nodeid;
296 my_memb_determine_list_entries += 1;
300 static void sync_service_build_handler (
unsigned int nodeid,
const void *msg)
304 int barrier_reached = 1;
306 int qsort_trigger = 0;
308 if (memcmp (&my_ring_id, &req_exec_service_build_message->ring_id,
313 for (i = 0; i < req_exec_service_build_message->service_list_entries; i++) {
316 for (j = 0; j < my_service_list_entries; j++) {
317 if (req_exec_service_build_message->service_list[i] ==
324 my_service_list[my_service_list_entries].
state =
326 my_service_list[my_service_list_entries].
service_id =
327 req_exec_service_build_message->service_list[i];
328 sprintf (my_service_list[my_service_list_entries].
name,
329 "Unknown External Service (id = %d)\n",
330 req_exec_service_build_message->service_list[i]);
331 my_service_list[my_service_list_entries].
sync_init =
333 my_service_list[my_service_list_entries].
sync_abort =
339 my_service_list_entries += 1;
345 qsort (my_service_list, my_service_list_entries,
348 for (i = 0; i < my_processor_list_entries; i++) {
349 if (my_processor_list[i].nodeid == nodeid) {
353 for (i = 0; i < my_processor_list_entries; i++) {
354 if (my_processor_list[i].received == 0) {
358 if (barrier_reached) {
359 sync_process_enter ();
363 static void sync_deliver_fn (
366 unsigned int msg_len,
367 int endian_conversion_required)
369 struct qb_ipc_request_header *
header = (
struct qb_ipc_request_header *)msg;
371 switch (header->id) {
373 sync_barrier_handler (nodeid, msg);
376 sync_service_build_handler (nodeid, msg);
379 sync_memb_determine (nodeid, msg);
384 static void memb_determine_message_transmit (
void)
389 req_exec_memb_determine_message.header.size =
sizeof (
struct req_exec_memb_determine_message);
392 memcpy (&req_exec_memb_determine_message.ring_id,
393 &my_memb_determine_ring_id,
396 iovec.iov_base = (
char *)&req_exec_memb_determine_message;
397 iovec.iov_len =
sizeof (req_exec_memb_determine_message);
403 static void barrier_message_transmit (
void)
408 req_exec_barrier_message.header.size =
sizeof (
struct req_exec_barrier_message);
411 memcpy (&req_exec_barrier_message.ring_id, &my_ring_id,
414 iovec.iov_base = (
char *)&req_exec_barrier_message;
415 iovec.iov_len =
sizeof (req_exec_barrier_message);
428 memcpy (&service_build_message->ring_id, &my_ring_id,
431 iovec.iov_base = (
void *)service_build_message;
438 static void sync_barrier_enter (
void)
441 barrier_message_transmit ();
444 static void sync_process_enter (
void)
453 if (my_service_list_entries == 0) {
455 my_memb_determine_list_entries = 0;
456 sync_synchronization_completed ();
459 for (i = 0; i < my_processor_list_entries; i++) {
467 static void sync_servicelist_build_enter (
468 const unsigned int *member_list,
469 size_t member_list_entries,
478 for (i = 0; i < member_list_entries; i++) {
479 my_processor_list[i].
nodeid = member_list[i];
482 my_processor_list_entries = member_list_entries;
484 memcpy (my_member_list, member_list,
485 member_list_entries *
sizeof (
unsigned int));
486 my_member_list_entries = member_list_entries;
488 my_processing_idx = 0;
490 memset(my_service_list, 0,
sizeof (
struct service_entry) * SERVICES_COUNT_MAX);
491 my_service_list_entries = 0;
501 my_service_list[my_service_list_entries].
state =
INIT;
502 my_service_list[my_service_list_entries].
service_id = i;
503 strcpy (my_service_list[my_service_list_entries].
name,
504 sync_callbacks.
name);
509 my_service_list_entries += 1;
512 for (i = 0; i < my_service_list_entries; i++) {
513 service_build.service_list[i] =
516 service_build.service_list_entries = my_service_list_entries;
518 service_build_message_transmit (&service_build);
521 static int schedwrk_processor (
const void *context)
525 if (my_service_list[my_processing_idx].
state ==
INIT) {
527 size_t old_trans_list_entries = 0;
531 memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
532 sizeof (
unsigned int));
533 old_trans_list_entries = my_trans_list_entries;
535 my_trans_list_entries = 0;
536 for (o = 0; o < old_trans_list_entries; o++) {
537 for (m = 0; m < my_member_list_entries; m++) {
538 if (old_trans_list[o] == my_member_list[m]) {
539 my_trans_list[my_trans_list_entries] = my_member_list[m];
540 my_trans_list_entries++;
547 my_service_list[my_processing_idx].
sync_init (my_trans_list,
548 my_trans_list_entries, my_member_list,
549 my_member_list_entries,
553 if (my_service_list[my_processing_idx].
state ==
PROCESS) {
556 res = my_service_list[my_processing_idx].
sync_process ();
561 sync_barrier_enter();
570 const unsigned int *member_list,
571 size_t member_list_entries,
575 memcpy (&my_ring_id, ring_id,
sizeof (
struct memb_ring_id));
577 if (my_memb_determine) {
578 my_memb_determine = 0;
579 sync_servicelist_build_enter (my_memb_determine_list,
580 my_memb_determine_list_entries, ring_id);
582 sync_servicelist_build_enter (member_list, member_list_entries,
588 const unsigned int *member_list,
589 size_t member_list_entries,
593 memcpy (my_trans_list, member_list, member_list_entries *
594 sizeof (
unsigned int));
595 my_trans_list_entries = member_list_entries;
604 my_service_list[my_processing_idx].
sync_abort ();
617 memcpy (&my_memb_determine_ring_id, ring_id,
620 memb_determine_message_transmit ();
626 my_memb_determine_list_entries = 0;
627 memset (&my_memb_determine_ring_id, 0,
sizeof (
struct memb_ring_id));
void sync_start(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
Totem Single Ring Protocol.
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
void(* sync_activate)(void)
#define MESSAGE_REQ_SYNC_MEMB_DETERMINE
struct message_header header
int totempg_groups_initialize(void **instance, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id))
Initialize a groups instance.
#define log_printf(level, format, args...)
void(* sync_activate)(void)
void schedwrk_destroy(hdb_handle_t handle)
void(* sync_init)(const unsigned int *trans_list, size_t trans_list_entries, const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
#define LOGSYS_LEVEL_ERROR
void sync_save_transitional(const unsigned int *member_list, size_t member_list_entries, const struct memb_ring_id *ring_id)
int totempg_groups_mcast_joined(void *instance, const struct iovec *iovec, unsigned int iov_len, int guarantee)
#define LOGSYS_LEVEL_DEBUG
enum sync_process_state state
#define MESSAGE_REQ_SYNC_SERVICE_BUILD
#define PROCESSOR_COUNT_MAX
void sync_memb_list_abort(void)
#define MESSAGE_REQ_SYNC_BARRIER
#define SERVICES_COUNT_MAX
int(* my_sync_callbacks_retrieve)(int service_id, struct sync_callbacks *callbacks)
int totempg_groups_join(void *instance, const struct totempg_group *groups, size_t group_cnt)
LOGSYS_DECLARE_SUBSYS("SYNC")
struct memb_ring_id ring_id
void sync_memb_list_determine(const struct memb_ring_id *ring_id)
int(* sync_process)(void)
int schedwrk_create(hdb_handle_t *handle, int(schedwrk_fn)(const void *), const void *context)
int(* sync_process)(void)