Shared Persistent Heap Data Environment Manual  1.1.0
Macros | Typedefs | Functions
sphmultipcqueue.h File Reference

Shared Persistent Heap, multi producer multi consumer queue. For shared memory multi-thread/multi-core applications. This implementation uses transactional memory operations to implement Lock Free Producer/Consumer queues (SPHMultiPCQueue_t). More...

#include "sastype.h"
#include "sphlfentry.h"
#include "sphdirectpcqueue.h"

Go to the source code of this file.

Macros

#define __C__
 ignore this macro behind the curtain
 
#define SPHMPMCQ_CIRCULAR   (1)
 internal options flag for circular log buffers
 
#define SPHMPMCQ_CIRCULAR_WRAPPED   (1<<1)
 internal options flag set when circular log buffers have wrapped
 
#define SPHMPMCQ_CIRCULAR_NOTFIRST   (1<<2)
 internal options flag set when circular log buffers have wrapped multiple times
 
#define SPHMPMCQ_CACHE_PREFETCH0   (1<<3)
 internal options flag for prefetching the immediate (0 offset) cache-line
 
#define SPHMPMCQ_CACHE_PREFETCH1   (1<<4)
 internal options flag for prefetching the next (line size offset) cache-line
 
#define SPHMPMCQ_CIRCULAR_RESETMASK
 internal options mask flag used the reset circular log buffers More...
 

Typedefs

typedef void * SPHMPMCQ_t
 Handle to an instance of SPH Multi Producer, Multi Consumer Queue. More...
 

Functions

__C__ SPHMPMCQ_t SPHMPMCQInit (void *buf_seg, block_size_t buf_size)
 Initialize a shared storage block as a MPMC Queue. More...
 
__C__ SPHMPMCQ_t SPHMPMCQInitWithStride (void *buf_seg, block_size_t buf_size, unsigned short entry_stride, unsigned int options)
 Initialize a shared storage block as a Multi Producer Multi Consumer Queue with a fixed entry stride. More...
 
__C__ SPHMPMCQ_t SPHMPMCQCreate (block_size_t buf_size)
 Allocate and initialize a shared storage block as a Multi Producer Multi Consumer Queue. More...
 
__C__ SPHMPMCQ_t SPHMPMCQCreateWithStride (block_size_t buf_size, unsigned short stride)
 Allocate and initialize a shared storage block as a Lock Free Multi Producer Multi Consumer Queue. More...
 
__C__ int SPHMPMCQGetStride (SPHMPMCQ_t queue)
 Return the entry stride for an existing Lock Free Multi Producer Multi Consumer Queue. More...
 
__C__ int SPHMPMCQGetEntries (SPHMPMCQ_t queue)
 Return the total number of entries for an existing Lock Free Multi Producer Multi Consumer Queue. More...
 
__C__ int SPHMPMCQIsEmpty (SPHMPMCQ_t queue)
 Return the status of the specified queue. More...
 
__C__ int SPHMPMCQIsFull (SPHMPMCQ_t queue)
 Return the status of the specified queue. More...
 
__C__ block_size_t SPHMPMCQFreeSpace (SPHMPMCQ_t queue)
 Returns the amount of free space (in bytes) remaining in the specified queue. More...
 
__C__ sphLFEntryID_t SPHMPMCQGetEntryTemplate (SPHMPMCQ_t queue)
 Return the entry template for an existing Lock Free Multi Producer Multi Consumer Queue. This template is used later to mark an allocated entry complete. More...
 
__C__ SPHLFEntryDirect_t SPHMPMCQAllocStrideDirectHTM (SPHMPMCQ_t queue)
 Allows the producer thread to allocate and initialize the header of a queue entry for access. The allocation is from the specified Multi Producer Multi Consumer Queue. More...
 
__C__ SPHLFEntryDirect_t SPHSPMCQAllocStrideDirect (SPHMPMCQ_t queue)
 Allows the producer thread to allocate and initialize the header of a queue entry for access. The allocation is from the specified Multi Producer Multi Consumer Queue. This function uses no synchronization, so only safe with external synchronization, or for use by a single producer, thus the "SPMC" moniker. More...
 
__C__ SPHLFEntryDirect_t SPHMPMCQGetNextCompleteDirectHTM (SPHMPMCQ_t queue)
 Allows the consumer to get the next completed queue entry from the specified multi producer multi consumer queue. More...
 
__C__ SPHLFEntryDirect_t SPHMPSCQGetNextCompleteDirect (SPHMPMCQ_t queue)
 Allows the consumer to get the next completed queue entry from the specified multi producer multi consumer queue. This function uses no synchronization, so only safe with external synchronization, or for use by a single consumer, thus the "MPSC" moniker. More...
 
__C__ int SPHMPMCQEntryDirectIsComplete (SPHLFEntryDirect_t directHandle)
 Return the status of the entry specified by the direct entry handle. More...
 
__C__ int SPHMPMCQFreeEntryDirect (SPHMPMCQ_t queue, SPHLFEntryDirect_t entry)
 Allows the consumer to free the queue entry it just processed (using SPHMPMCQGetNextComplete), from the specified multi producer multi consumer queue. More...
 
__C__ SPHLFEntryDirect_t SPHMPMCQGetNextEntry (SPHMPMCQ_t queue)
 Allows the consumer to get the next allocated queue entry from the specified multi producer multi consumer queue. More...
 
__C__ int SPHMPMCQSetCachePrefetch (SPHMPMCQ_t queue, int prefetch)
 Set the cache-line prefetch options for entry allocate. More...
 
__C__ int SPHMPMCQPrefetch (SPHMPMCQ_t queue)
 Prefetch pages from the specific queue. More...
 
__C__ int SPHMPMCQDestroy (SPHMPMCQ_t queue)
 Destroys the queue and frees the SAS storage for reuse. More...
 

Detailed Description

Shared Persistent Heap, multi producer multi consumer queue. For shared memory multi-thread/multi-core applications. This implementation uses transactional memory operations to implement Lock Free Producer/Consumer queues (SPHMultiPCQueue_t).

! This API supports atomic allocation of storage for queue entries for zero copy persistence and sharing. Zero copy queues divides the process of producing a queue entry in to three steps:

#include <sphlfentry.h>
typedef struct data_struct11 {
double field0;
int field1;
int field2;
void *field3;
} data_struct11;
// Allocate zero copy queue entry
handle = SPHMPMCQMultiAllocStrideDirectHTM (pcqueue);
if (handle)
{ // insert data into the allocated queue entry
data_struct11 *struct_ptr;
struct_ptr = (data_struct11 *)SPHLFEntryDirectGetFreePtr(handle);
if (struct_ptr)
{ // store struct fields directly into allocated queue entry
struct_ptr->field0 = data_double1;
struct_ptr->field1 = data_int2;
struct_ptr->field2 = data_int3;
struct_ptr->field3 = (void*)sas_data_buff2;
} else {
printf("SPHENTRYALLOCSTRUCT(%p, data_struct11) failed)\n",
handle);
}
// Mark the entry complete and available to the consumer
} else {
while (SPHMPMCQMultiIsQueueFull(pcqueue))
{
// pacing code
}
}

The consumer can access queue entries once they are marked complete. The consumer:

#include <sphlfentry.h>
typedef struct data_struct11 {
double field0;
int field1;
int field2;
void *field3;
} data_struct11;
// Get next queue entry
if (handle)
{ // insert data into the allocated queue entry
data_struct11 *struct_ptr;
struct_ptr = (data_struct11*)SPHLFEntryGetNextPtr (handle);
if (struct_ptr)
{ // access struct fields directly from queue entry
data_double1 = struct_ptr->field0;
data_int2 = struct_ptr->field1;
data_int3 = struct_ptr->field2;
sas_data_buff2 = struct_ptr->field3;
} else {
printf("SPHLFEntryGetNextPtr(%p, data_struct11) failed)\n",
handle);
}
// Mark the entry free and available for reuse
} else {
while (SPHMPMCQIsQueueEmpty(pcqueue))
{
// pacing code
}
}

In this implementation the allocation of the entry is serialized using Transactional Memory for atomic updates.

As an option the queue entry allocator will fill in a 4 or 16 byte entry header with:

Any additional storage allocated to the entry (after the header) is available for application specific data. This API also provides several mechanisms to store application data including; direct array or structure overlay, and a streams like mechanism. The API provides a completion function (SPHLFEntryDirectComplete) which provides any memory barriers required by the platform and marks the entry complete.

The API support simple circular queues and requires a constant entry stride. A stride that matches or is multiple of the cache line size can improve performance by avoiding "false sharing" of cache lines containing multiple queue entries across cores/sockets.

Todo:
Additional work will include automatic pacing with Hysteresis

Macro Definition Documentation

#define SPHMPMCQ_CIRCULAR_RESETMASK
Value:
(SPHMPCQ_CIRCULAR | \
SPHMPCQ_CACHE_PREFETCH0 | \
SPHMPCQ_CACHE_PREFETCH1)

internal options mask flag used the reset circular log buffers

Typedef Documentation

typedef void* SPHMPMCQ_t

Handle to an instance of SPH Multi Producer, Multi Consumer Queue.

The type is SAS_RUNTIME_PCQUEUE_TM

Function Documentation

__C__ SPHLFEntryDirect_t SPHMPMCQAllocStrideDirectHTM ( SPHMPMCQ_t  queue)

Allows the producer thread to allocate and initialize the header of a queue entry for access. The allocation is from the specified Multi Producer Multi Consumer Queue.

The allocation size is the stride set when the PC queue was initialized/created. The Entry status and length are stored in the header of the new entry. Returns an dire ctentry handle which allows the application to insert application specific data into the entry via the sphlfentry.h API. If the specified queue is full the allocation may fail.

Note
The queue entry is not ready for access by the Consumer thread, until additional application data is inserted and the entry is completed (via SPHLFEntryDirectComplete). Category and Subcategory may be supplied as the entry is completed.
Parameters
queueHandle of a producer consumer queue.
Returns
Direct handle of the initialized queue entry, or 0 (NULL) if the allocation failed. For example the Allocate may fail if the queue is full.
__C__ SPHMPMCQ_t SPHMPMCQCreate ( block_size_t  buf_size)

Allocate and initialize a shared storage block as a Multi Producer Multi Consumer Queue.

Allocate a block from SAS storage and initialize that block block as a PC Queue. The storage block must be power of two in size.

Parameters
buf_sizepower of two size of the heap to be initialized.
Returns
a handle to the initialized SPHMPMCQ_t.
__C__ SPHMPMCQ_t SPHMPMCQCreateWithStride ( block_size_t  buf_size,
unsigned short  stride 
)

Allocate and initialize a shared storage block as a Lock Free Multi Producer Multi Consumer Queue.

Allocate a block from SAS storage and initialize that block block as a Logger. The storage block must be power of two in size.

Parameters
buf_sizepower of two size of the heap to be initialized.
stridethe stride offset is bytes between allocated entries.
Returns
a handle to the initialized SPHMPMCQ_t.
__C__ int SPHMPMCQDestroy ( SPHMPMCQ_t  queue)

Destroys the queue and frees the SAS storage for reuse.

Parameters
queueHandle to a queue to be destroyed.
Returns
0 if successful.
__C__ int SPHMPMCQEntryDirectIsComplete ( SPHLFEntryDirect_t  directHandle)

Return the status of the entry specified by the direct entry handle.

Parameters
directHandleentry Handle for an allocated entry.
Returns
true if the entry was complete (SPHMPMCQEntryComplete has been called for this entry). Otherwise False.
__C__ int SPHMPMCQFreeEntryDirect ( SPHMPMCQ_t  queue,
SPHLFEntryDirect_t  entry 
)

Allows the consumer to free the queue entry it just processed (using SPHMPMCQGetNextComplete), from the specified multi producer multi consumer queue.

Mark the provided entry as free (unallocated and invalid) and if this entry is the current queue tail, bump the queue tail pointer to the next unallocated entry. If the specified queue is empty or the current tail entry is not yet completed the Free may fail.

Warning
The Consumer thread should not touch or modify a queue entry after calling FreeEntry. This is important to both correctness and performance.
Parameters
queueHandle of a producer consumer queue.
entryqueue entry to be marked free
Returns
True for successful tail free, otherwise indicated failure. For example the Get may fail if the queue is empty or the next tail entry is not yet completed.
__C__ block_size_t SPHMPMCQFreeSpace ( SPHMPMCQ_t  queue)

Returns the amount of free space (in bytes) remaining in the specified queue.

Parameters
queueHandle to a queue.
Returns
number of bytes of free space remaining in the queue buffer.
__C__ int SPHMPMCQGetEntries ( SPHMPMCQ_t  queue)

Return the total number of entries for an existing Lock Free Multi Producer Multi Consumer Queue.

Parameters
queueHandle of a producer consumer queue.
Returns
total number of entries of strided queues, 0 if not strided, or -1 is not a valid SPHMPMCQ_t.
__C__ sphLFEntryID_t SPHMPMCQGetEntryTemplate ( SPHMPMCQ_t  queue)

Return the entry template for an existing Lock Free Multi Producer Multi Consumer Queue. This template is used later to mark an allocated entry complete.

Parameters
queueHandle of a producer consumer queue.
Returns
the entry template for this queue or 0 if not a valid SPHMPMCQ_t.
__C__ SPHLFEntryDirect_t SPHMPMCQGetNextCompleteDirectHTM ( SPHMPMCQ_t  queue)

Allows the consumer to get the next completed queue entry from the specified multi producer multi consumer queue.

Returns an direct entry handle which allows the application to access the application specific data inserted by the produced thread. If the specified queue is empty or the next queue is not yet completed the get may fail.

Parameters
queueHandle of a producer consumer queue.
Returns
Direct Handle of the initialized logger entry, or 0 (NULL) if the get failed. For example the Get may fail if the queue is empty or the next tail entry is not yet completed.
__C__ SPHLFEntryDirect_t SPHMPMCQGetNextEntry ( SPHMPMCQ_t  queue)

Allows the consumer to get the next allocated queue entry from the specified multi producer multi consumer queue.

Returns an direct entry handle which allows the application to access the entry allocated by the produced thread. If the specified queue is empty or the next queue is not yet allocated the get may fail. Returning a entry does not mean the the producer has completed the entry and the consumer wait/spin (SPHLFEntryDirectIsComplete) for the entry to become complete.

Parameters
queueHandle of a producer consumer queue.
Returns
Direct Handle of the initialized logger entry, or 0 (NULL) if the get failed. For example the Get may fail if the queue is empty or the next tail entry is not yet allocated.
__C__ int SPHMPMCQGetStride ( SPHMPMCQ_t  queue)

Return the entry stride for an existing Lock Free Multi Producer Multi Consumer Queue.

Parameters
queueHandle of a producer consumer queue.
Returns
the entry stride of strided queues, 0 if not strided, or -1 is not a valid SPHMPMCQ_t.
__C__ SPHMPMCQ_t SPHMPMCQInit ( void *  buf_seg,
block_size_t  buf_size 
)

Initialize a shared storage block as a MPMC Queue.

Initialize the specified storage block as MPPC Queue control blocks. The storage block must be power of two in size and have the same power of two (or better) alignment. The type should be SAS_RUNTIME_PCQUEUE_TM.

Parameters
buf_sega block of allocated SAS storage matching the buf_size.
buf_sizepower of two size of the heap to be initialized.
Returns
a handle to the initialized SPHMPMCQ_t.
__C__ SPHMPMCQ_t SPHMPMCQInitWithStride ( void *  buf_seg,
block_size_t  buf_size,
unsigned short  entry_stride,
unsigned int  options 
)

Initialize a shared storage block as a Multi Producer Multi Consumer Queue with a fixed entry stride.

Initialize the specified storage block as MPMC Queue control blocks. The stride and control flags are also stored. The storage block must be power of two in size and have the same power of two (or better) alignment. The type should be SAS_RUNTIME_PCQUEUE_TM.

Parameters
buf_sega block of allocated SAS storage matching the buf_size.
buf_sizepower of two size of the heap to be initialized.
entry_stridethe stride offset is bytes between allocated entries.
optionsoption bits.
Returns
a handle to the initialized SPHMPMCQ_t.
__C__ int SPHMPMCQIsEmpty ( SPHMPMCQ_t  queue)

Return the status of the specified queue.

Parameters
queueHandle to a queue.
Returns
true if the queue is currently Empty (no entries). Otherwise False.
__C__ int SPHMPMCQIsFull ( SPHMPMCQ_t  queue)

Return the status of the specified queue.

Parameters
queueHandle to a queue.
Returns
true if the queue is currently full. Otherwise False.
__C__ int SPHMPMCQPrefetch ( SPHMPMCQ_t  queue)

Prefetch pages from the specific queue.

Parameters
queueHandle to a queue.
Returns
0 if successful.
__C__ int SPHMPMCQSetCachePrefetch ( SPHMPMCQ_t  queue,
int  prefetch 
)

Set the cache-line prefetch options for entry allocate.

prefetch == 0; No prefetch issued.
prefetch == 1; Prefetch the currently allocated cache-line.
prefetch == 2; Prefetch the cache-line following the allocated entry.
prefetch == 3; Prefetch the current and next cache-lines.

@param queue Handle to a queue.
@param prefetch prefetch option code.
@return 0 if successful.
__C__ SPHLFEntryDirect_t SPHMPSCQGetNextCompleteDirect ( SPHMPMCQ_t  queue)

Allows the consumer to get the next completed queue entry from the specified multi producer multi consumer queue. This function uses no synchronization, so only safe with external synchronization, or for use by a single consumer, thus the "MPSC" moniker.

Returns an direct entry handle which allows the application to access the application specific data inserted by the produced thread. If the specified queue is empty or the next queue is not yet completed the get may fail.

Parameters
queueHandle of a producer consumer queue.
Returns
Direct Handle of the initialized logger entry, or 0 (NULL) if the get failed. For example the Get may fail if the queue is empty or the next tail entry is not yet completed.
__C__ SPHLFEntryDirect_t SPHSPMCQAllocStrideDirect ( SPHMPMCQ_t  queue)

Allows the producer thread to allocate and initialize the header of a queue entry for access. The allocation is from the specified Multi Producer Multi Consumer Queue. This function uses no synchronization, so only safe with external synchronization, or for use by a single producer, thus the "SPMC" moniker.

The allocation size is the stride set when the PC queue was initialized/created. The Entry status and length are stored in the header of the new entry. Returns an dire ctentry handle which allows the application to insert application specific data into the entry via the sphlfentry.h API. If the specified queue is full the allocation may fail.

Note
The queue entry is not ready for access by the Consumer thread, until additional application data is inserted and the entry is completed (via SPHLFEntryDirectComplete). Category and Subcategory may be supplied as the entry is completed.
Parameters
queueHandle of a producer consumer queue.
Returns
Direct handle of the initialized queue entry, or 0 (NULL) if the allocation failed. For example the Allocate may fail if the queue is full.