//go:build go1.19 // +build go1.19 package xsync import ( "runtime" "sync/atomic" "unsafe" ) // A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent // queue. It's a generic version of MPMCQueue. // // MPMCQueue instances must be created with NewMPMCQueueOf function. // A MPMCQueueOf must not be copied after first use. // // Based on the data structure from the following C++ library: // https://github.com/rigtorp/MPMCQueue type MPMCQueueOf[I any] struct { cap uint64 head uint64 //lint:ignore U1000 prevents false sharing hpad [cacheLineSize - 8]byte tail uint64 //lint:ignore U1000 prevents false sharing tpad [cacheLineSize - 8]byte slots []slotOfPadded[I] } type slotOfPadded[I any] struct { slotOf[I] // Unfortunately, proper padding like the below one: // // pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte // // won't compile, so here we add a best-effort padding for items up to // 56 bytes size. //lint:ignore U1000 prevents false sharing pad [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte } type slotOf[I any] struct { // atomic.Uint64 is used here to get proper 8 byte alignment on // 32-bit archs. turn atomic.Uint64 item I } // NewMPMCQueueOf creates a new MPMCQueueOf instance with the given // capacity. func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] { if capacity < 1 { panic("capacity must be positive number") } return &MPMCQueueOf[I]{ cap: uint64(capacity), slots: make([]slotOfPadded[I], capacity), } } // Enqueue inserts the given item into the queue. // Blocks, if the queue is full. func (q *MPMCQueueOf[I]) Enqueue(item I) { head := atomic.AddUint64(&q.head, 1) - 1 slot := &q.slots[q.idx(head)] turn := q.turn(head) * 2 for slot.turn.Load() != turn { runtime.Gosched() } slot.item = item slot.turn.Store(turn + 1) } // Dequeue retrieves and removes the item from the head of the queue. // Blocks, if the queue is empty. func (q *MPMCQueueOf[I]) Dequeue() I { var zeroedI I tail := atomic.AddUint64(&q.tail, 1) - 1 slot := &q.slots[q.idx(tail)] turn := q.turn(tail)*2 + 1 for slot.turn.Load() != turn { runtime.Gosched() } item := slot.item slot.item = zeroedI slot.turn.Store(turn + 1) return item } // TryEnqueue inserts the given item into the queue. Does not block // and returns immediately. The result indicates that the queue isn't // full and the item was inserted. func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool { head := atomic.LoadUint64(&q.head) for { slot := &q.slots[q.idx(head)] turn := q.turn(head) * 2 if slot.turn.Load() == turn { if atomic.CompareAndSwapUint64(&q.head, head, head+1) { slot.item = item slot.turn.Store(turn + 1) return true } } else { prevHead := head head = atomic.LoadUint64(&q.head) if head == prevHead { return false } } runtime.Gosched() } } // TryDequeue retrieves and removes the item from the head of the // queue. Does not block and returns immediately. The ok result // indicates that the queue isn't empty and an item was retrieved. func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) { tail := atomic.LoadUint64(&q.tail) for { slot := &q.slots[q.idx(tail)] turn := q.turn(tail)*2 + 1 if slot.turn.Load() == turn { if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { var zeroedI I item = slot.item ok = true slot.item = zeroedI slot.turn.Store(turn + 1) return } } else { prevTail := tail tail = atomic.LoadUint64(&q.tail) if tail == prevTail { return } } runtime.Gosched() } } func (q *MPMCQueueOf[I]) idx(i uint64) uint64 { return i % q.cap } func (q *MPMCQueueOf[I]) turn(i uint64) uint64 { return i / q.cap }