-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathmqueue.h
More file actions
139 lines (128 loc) · 3.75 KB
/
Copy pathmqueue.h
File metadata and controls
139 lines (128 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#ifndef __MQUEUE_H__
#define __MQUEUE_H__
/**
* Copyright (c) 2014, Zhiyong Liu <Neesenk at gmail dot com>
* All rights reserved.
*/
#ifdef __cplusplus
extern "C" {
#endif // __cplusplus
/**
* 无锁队列实现:
*
* mqueue和mqueuebatch支持多个读者和多个写者并发操作的的无锁队列
*
* mqueue 接口和mqueuebatch接口的区别:
* mqueue能够保证入队和出队一致, mqueuebatch只能保证一个线程下的出队和入队是一致的
* mqueuebatch的性能是mqueue的性能的cpu个数倍以上
*
* mqueue example:
*
* struct data { uint32_t a; uint64_t b; };
* struct mqueue *q = mqueue_create(655360, sizeof(struct data));
* assert(q != NULL);
*
* writer:
* struct data *data = NULL;
* if ((data = (struct data *)mqueue_writer_parpare(q)) != NULL) {
* ...
* mqueue_writer_commit(q, data);
* } else {
* ...
* }
*
* reader:
* int n = 0;
* struct reader_result res;
* if ((n = mqueue_reader_parpare(q, &res)) > 0) {
* int i = 0;
* for (i = 0; i < n; i++) {
* struct data *data = mqueue_reader_next(&res);
* assert(data != NULL);
* ...
* }
*
* mqueue_reader_commit(q, &res);
* } else {
* ...
* }
*
* mqueue_destroy(q);
*
* queuebatch example:
*
* struct data { uint32_t a; uint64_t b; };
* struct mqueuebatch *qs = mqueuebatch_create(655360, sizeof(struct data));
* assert(qs != NULL);
*
* writer:
* struct data *data = NULL;
* struct mqueuebatch_writer w;
* mqueuebatch_writer_init(&w, qs);
*
* if ((data = (struct data *)mqueuebatch_writer_parpare(&w)) != NULL) {
* ...
* mqueue_writer_commit(&w, data);
* } else {
* ...
* }
*
* reader:
* int n = 0;
* struct reader_result res;
* struct mqueuebatch r;
* mqueuebatch_reader_init(&r, qs);
* if ((n = mqueuebatch_reader_parpare(&r, &res)) > 0) {
* int i = 0;
* for (i = 0; i < n; i++) {
* struct data *data = mqueuebatch_reader_next(&res);
* assert(data != NULL);
* ...
* }
*
* mqueuebatch_reader_commit(&r, &res);
* } else {
* ...
* }
*
* mqueuebatch_destroy(qs);
*/
struct item;
struct mqueue;
struct mqueuebatch;
// 读进程获取到的结果
struct reader_result {
size_t nmemb; // 包含了多少条记录
struct mqueue *q; // 所属的queue
struct item *header;// 头部
struct item *tail; // 尾部
struct item *curr; // 当前遍历到的节点
};
struct mqueuebatch_reader {
int queueid;
struct mqueuebatch *queuebatch;
};
struct mqueuebatch_writer {
struct mqueue *queue;
struct mqueuebatch *queuebatch;
};
struct mqueue *mqueue_create(size_t nmemb, size_t size);
void mqueue_destroy(struct mqueue *q);
void *mqueue_writer_parpare(struct mqueue *q);
void mqueue_writer_commit(struct mqueue *q, void *ptr);
void *mqueue_reader_next(struct reader_result *res);
size_t mqueue_reader_parpare(struct mqueue *q, struct reader_result *ret);
void mqueue_reader_commit(struct mqueue *q, struct reader_result *res);
void mqueuebatch_reader_init(struct mqueuebatch_reader *reader, struct mqueuebatch *qs);
void mqueuebatch_writer_init(struct mqueuebatch_writer *writer, struct mqueuebatch *qs);
struct mqueuebatch *mqueuebatch_create(size_t nmemb, size_t size);
void mqueuebatch_destroy(struct mqueuebatch *qs);
void *mqueuebatch_writer_parpare(struct mqueuebatch_writer *writer);
void mqueuebatch_writer_commit(struct mqueuebatch_writer *writer, void *ptr);
void *mqueuebatch_reader_next(struct reader_result *res);
size_t mqueuebatch_reader_parpare(struct mqueuebatch_reader *reader, struct reader_result *ret);
void mqueuebatch_reader_commit(struct mqueuebatch_reader *reader, struct reader_result *res);
#ifdef __cplusplus
}
#endif // __cplusplus
#endif // __MQUEUE_H__