1 module dzmq.concurrency_mixins;
2 
3 
4 mixin template DefaultConcurrencyPolicy() {
5 
6     mixin template SocketClose() {}
7     mixin template SocketBind() {}
8     mixin template SocketConnect() {}
9 
10     mixin template WriteableSocket() {
11 
12         void send(string[] frames) {
13             scope (exit) { _busy = false; }
14             _busy = true;
15 
16             zmq_msg_t req;
17             int rc;
18 
19             if (frames.length==1) {
20                 rc = zmq_send(_sock, frames[0].ptr, frames[0].length, 0);
21                 if (rc<0) throw new ZmqException("Error on write");
22                 return;
23             }
24 
25             foreach(frame; frames[0..frames.length-1]){
26                 rc = zmq_send(_sock, frame.ptr, frame.length, ZMQ_SNDMORE);
27                 if (rc<0) throw new ZmqException("Error on write");
28             }
29             rc = zmq_send(_sock, frames[frames.length-1].ptr, frames[frames.length-1].length, 0);
30             if (rc<0) throw new ZmqException("Error on write");
31         }
32     }
33 
34     mixin template ReadableSocket() {
35         string[] receive() {
36             scope (exit) { _busy = false; }
37             _busy = true;
38 
39             char* buffer;
40             int rc;
41             zmq_msg_t frame;
42             int more;
43             size_t more_size = more.sizeof;
44 
45             scope string[] frames;
46             do {
47                 zmq_msg_init(&frame);
48                 rc = zmq_recvmsg(_sock, &frame, 0);
49                 if (rc<0) throw new ZmqException("Error on read");
50                 buffer = (cast(char*)zmq_msg_data(&frame));
51                 frames ~= buffer[0..zmq_msg_size(&frame)].to!string;
52                 zmq_getsockopt(_sock, ZMQ_RCVMORE, &more, &more_size);
53                 zmq_msg_close(&frame);
54             } while(more);
55 
56             return frames.dup;
57         }
58     }
59 }
60 
61 version(Have_vibe_d) {
62     pragma(msg, "Compiling dzmq with vibe.d support");
63 
64     public import vibe.d;
65     public import vibe.core.core;
66     public import core.time;
67 
68     mixin template VibeDConcurrencyPolicy() {
69 
70         private {
71             FileDescriptorEvent _fd_read_evt;
72             FileDescriptorEvent _fd_write_evt;
73             int _fd;
74             TaskCondition _readCondition;
75             TaskMutex _readMutex;
76             TaskMutex _writeMutex;
77             Task _reader;
78         }
79 
80         void __onClose() {
81             _fd_read_evt.destroy();
82             _fd_write_evt.destroy();
83         }
84 
85         void __onConnect() {
86             size_t fd_size = _fd.sizeof;
87             zmq_getsockopt(_sock, ZMQ_FD, &_fd, &fd_size);
88             _fd_read_evt = createFileDescriptorEvent(_fd, FileDescriptorEvent.Trigger.read, false);
89             _fd_write_evt = createFileDescriptorEvent(_fd, FileDescriptorEvent.Trigger.write, false);
90 
91             _readMutex = new TaskMutex;
92             _writeMutex = new TaskMutex;
93             _readCondition = new TaskCondition(_readMutex);
94         }
95 
96         void __onBind() {
97             size_t fd_size = _fd.sizeof;
98             zmq_getsockopt(_sock, ZMQ_FD, &_fd, &fd_size);
99             _fd_read_evt = createFileDescriptorEvent(_fd, FileDescriptorEvent.Trigger.read, false);
100             _fd_write_evt = createFileDescriptorEvent(_fd, FileDescriptorEvent.Trigger.write, false);
101 
102             _readMutex = new TaskMutex;
103             _writeMutex = new TaskMutex;
104             _readCondition = new TaskCondition(_readMutex);
105         }
106 
107         mixin template WriteableSocket() {
108             void send(string[] frames, Duration timeout=100.msecs) {
109 		        scope(exit) _busy=false;
110 
111                 synchronized(_writeMutex) {
112                     _busy = true;
113                     if (!_fd_write_evt.wait(timeout, FileDescriptorEvent.Trigger.write)){
114                         throw new ZmqTimeout();
115                     }
116                     zmq_msg_t req;
117                     int rc;
118 
119 
120                     if (frames.length==1) {
121                         rc = zmq_send(_sock, frames[0].ptr, frames[0].length, 0);
122                         if (rc<0) throw new ZmqException("Error on write");
123                         return;
124                     } else {
125                         foreach(part; frames[0..frames.length-1]){
126                             rc = zmq_send(_sock, part.ptr, part.length, ZMQ_SNDMORE);
127                             if (rc<0) throw new ZmqException("Error on write");
128                         }
129                         rc = zmq_send(_sock, frames[frames.length-1].ptr, frames[frames.length-1].length, 0);
130                         if (rc<0) throw new ZmqException("Error on write");
131                     }
132 
133                     // we need that to reset the zmq_events as per the zmq manual
134                     int events;
135                     size_t events_size = events.sizeof;
136                     zmq_getsockopt(_sock, ZMQ_EVENTS, &events, &events_size);
137                 }
138             }
139         }
140 
141         mixin template ReadableSocket() {
142             string[] receive(Duration timeout=100.msecs) {
143                 scope(exit) _busy=false;
144 
145                 synchronized(_readMutex) {
146                     _busy = true;
147                     char* buffer;
148                     int rc;
149                     zmq_msg_t frame;
150                     int more;
151                     size_t more_size = more.sizeof;
152 
153                     do {
154                         _fd_read_evt.wait(FileDescriptorEvent.Trigger.read);
155                         if (!can_read ) { continue;	}
156 
157                         scope string[] frames;
158                         do {
159                             zmq_msg_init(&frame);
160                             rc = zmq_recvmsg(_sock, &frame, 0);
161                             if (rc<0) throw new ZmqException("Error on read");
162                             buffer = (cast(char*)zmq_msg_data(&frame));
163                             frames ~= buffer[0..zmq_msg_size(&frame)].to!string;
164                             zmq_getsockopt(_sock, ZMQ_RCVMORE, &more, &more_size);
165                             zmq_msg_close(&frame);
166                         } while(more && can_read);
167 
168                         return frames.dup;
169                     } while(!can_read);
170 
171                     assert(0);
172                 }
173                 
174             }
175         }
176     }
177 }