1 module dzmq.concurrency_mixins; 2 3 4 mixin template DefaultConcurrencyPolicy() { 5 6 mixin template SocketClose() {} 7 mixin template SocketBind() {} 8 mixin template SocketConnect() {} 9 10 mixin template WriteableSocket() { 11 12 void send(string[] frames) { 13 scope (exit) { _busy = false; } 14 _busy = true; 15 16 zmq_msg_t req; 17 int rc; 18 19 if (frames.length==1) { 20 rc = zmq_send(_sock, frames[0].ptr, frames[0].length, 0); 21 if (rc<0) throw new ZmqException("Error on write"); 22 return; 23 } 24 25 foreach(frame; frames[0..frames.length-1]){ 26 rc = zmq_send(_sock, frame.ptr, frame.length, ZMQ_SNDMORE); 27 if (rc<0) throw new ZmqException("Error on write"); 28 } 29 rc = zmq_send(_sock, frames[frames.length-1].ptr, frames[frames.length-1].length, 0); 30 if (rc<0) throw new ZmqException("Error on write"); 31 } 32 } 33 34 mixin template ReadableSocket() { 35 string[] receive() { 36 scope (exit) { _busy = false; } 37 _busy = true; 38 39 char* buffer; 40 int rc; 41 zmq_msg_t frame; 42 int more; 43 size_t more_size = more.sizeof; 44 45 scope string[] frames; 46 do { 47 zmq_msg_init(&frame); 48 rc = zmq_recvmsg(_sock, &frame, 0); 49 if (rc<0) throw new ZmqException("Error on read"); 50 buffer = (cast(char*)zmq_msg_data(&frame)); 51 frames ~= buffer[0..zmq_msg_size(&frame)].to!string; 52 zmq_getsockopt(_sock, ZMQ_RCVMORE, &more, &more_size); 53 zmq_msg_close(&frame); 54 } while(more); 55 56 return frames.dup; 57 } 58 } 59 } 60 61 version(Have_vibe_d) { 62 pragma(msg, "Compiling dzmq with vibe.d support"); 63 64 public import vibe.d; 65 public import vibe.core.core; 66 public import core.time; 67 68 mixin template VibeDConcurrencyPolicy() { 69 70 private { 71 FileDescriptorEvent _fd_read_evt; 72 FileDescriptorEvent _fd_write_evt; 73 int _fd; 74 TaskCondition _readCondition; 75 TaskMutex _readMutex; 76 TaskMutex _writeMutex; 77 Task _reader; 78 } 79 80 void __onClose() { 81 _fd_read_evt.destroy(); 82 _fd_write_evt.destroy(); 83 } 84 85 void __onConnect() { 86 size_t fd_size = _fd.sizeof; 87 zmq_getsockopt(_sock, ZMQ_FD, &_fd, &fd_size); 88 _fd_read_evt = createFileDescriptorEvent(_fd, FileDescriptorEvent.Trigger.read, false); 89 _fd_write_evt = createFileDescriptorEvent(_fd, FileDescriptorEvent.Trigger.write, false); 90 91 _readMutex = new TaskMutex; 92 _writeMutex = new TaskMutex; 93 _readCondition = new TaskCondition(_readMutex); 94 } 95 96 void __onBind() { 97 size_t fd_size = _fd.sizeof; 98 zmq_getsockopt(_sock, ZMQ_FD, &_fd, &fd_size); 99 _fd_read_evt = createFileDescriptorEvent(_fd, FileDescriptorEvent.Trigger.read, false); 100 _fd_write_evt = createFileDescriptorEvent(_fd, FileDescriptorEvent.Trigger.write, false); 101 102 _readMutex = new TaskMutex; 103 _writeMutex = new TaskMutex; 104 _readCondition = new TaskCondition(_readMutex); 105 } 106 107 mixin template WriteableSocket() { 108 void send(string[] frames, Duration timeout=100.msecs) { 109 scope(exit) _busy=false; 110 111 synchronized(_writeMutex) { 112 _busy = true; 113 if (!_fd_write_evt.wait(timeout, FileDescriptorEvent.Trigger.write)){ 114 throw new ZmqTimeout(); 115 } 116 zmq_msg_t req; 117 int rc; 118 119 120 if (frames.length==1) { 121 rc = zmq_send(_sock, frames[0].ptr, frames[0].length, 0); 122 if (rc<0) throw new ZmqException("Error on write"); 123 return; 124 } else { 125 foreach(part; frames[0..frames.length-1]){ 126 rc = zmq_send(_sock, part.ptr, part.length, ZMQ_SNDMORE); 127 if (rc<0) throw new ZmqException("Error on write"); 128 } 129 rc = zmq_send(_sock, frames[frames.length-1].ptr, frames[frames.length-1].length, 0); 130 if (rc<0) throw new ZmqException("Error on write"); 131 } 132 133 // we need that to reset the zmq_events as per the zmq manual 134 int events; 135 size_t events_size = events.sizeof; 136 zmq_getsockopt(_sock, ZMQ_EVENTS, &events, &events_size); 137 } 138 } 139 } 140 141 mixin template ReadableSocket() { 142 string[] receive(Duration timeout=100.msecs) { 143 scope(exit) _busy=false; 144 145 synchronized(_readMutex) { 146 _busy = true; 147 char* buffer; 148 int rc; 149 zmq_msg_t frame; 150 int more; 151 size_t more_size = more.sizeof; 152 153 do { 154 _fd_read_evt.wait(FileDescriptorEvent.Trigger.read); 155 if (!can_read ) { continue; } 156 157 scope string[] frames; 158 do { 159 zmq_msg_init(&frame); 160 rc = zmq_recvmsg(_sock, &frame, 0); 161 if (rc<0) throw new ZmqException("Error on read"); 162 buffer = (cast(char*)zmq_msg_data(&frame)); 163 frames ~= buffer[0..zmq_msg_size(&frame)].to!string; 164 zmq_getsockopt(_sock, ZMQ_RCVMORE, &more, &more_size); 165 zmq_msg_close(&frame); 166 } while(more && can_read); 167 168 return frames.dup; 169 } while(!can_read); 170 171 assert(0); 172 } 173 174 } 175 } 176 } 177 }