1 module dzmq.socket; 2 3 import dzmq.concurrency_mixins; 4 5 import deimos.zmq.zmq; 6 import std.conv: to; 7 import std.string; 8 import std.algorithm: find, remove; 9 import std.stdio; 10 import std.exception; 11 12 enum ZmqSocketType{ 13 Pair = ZMQ_PAIR, 14 Pub = ZMQ_PUB, 15 Sub = ZMQ_SUB, 16 Req = ZMQ_REQ, 17 Rep = ZMQ_REP, 18 Dealer = ZMQ_DEALER, 19 Router = ZMQ_ROUTER, 20 Pull = ZMQ_PULL, 21 Push = ZMQ_PUSH, 22 XPub = ZMQ_XPUB, 23 XSub = ZMQ_XSUB 24 } 25 26 class ZmqTimeout: Exception { 27 this(string msg = "Operation timedout"){ 28 super(msg); 29 } 30 } 31 32 class ZmqException: ErrnoException { 33 this(string msg) { 34 super(msg); 35 } 36 } 37 38 class ZmqContext { 39 private { 40 immutable void * _ctx; 41 bool _terminated = false; 42 this (immutable void * ctx) { 43 _ctx = ctx; 44 } 45 } 46 47 this(int threads=1){ 48 _ctx = cast(immutable(void*))zmq_init(threads); 49 } 50 51 ~this() { 52 terminate(); 53 } 54 55 auto dup() { 56 return new ZmqContext(_ctx); 57 } 58 59 void terminate(){ 60 if (_terminated) return; 61 zmq_term(cast(void*)_ctx); 62 debug { writeln("Terminated ctx"); } 63 _terminated = true; 64 } 65 66 auto ref socket(ZmqSocketType sockType, alias ConcurrencyPolicy = DefaultConcurrencyPolicy)(){ 67 return new ZmqSocket!(sockType, ConcurrencyPolicy)(zmq_socket(cast(void*)_ctx, cast(int)sockType)); 68 } 69 } 70 71 private mixin template WritableProperties() { 72 @property bool can_write() { 73 int events; 74 size_t events_size = events.sizeof; 75 zmq_getsockopt(_sock, ZMQ_EVENTS, &events, &events_size); 76 return (events & ZMQ_POLLOUT) == ZMQ_POLLOUT; 77 } 78 } 79 80 private mixin template ReadableProperties() { 81 @property bool can_read() { 82 int events; 83 size_t events_size = events.sizeof; 84 zmq_getsockopt(_sock, ZMQ_EVENTS, &events, &events_size); 85 return (events & ZMQ_POLLIN) == ZMQ_POLLIN; 86 } 87 } 88 89 class ZmqSocket(ZmqSocketType type, alias ConcurrencyPolicy = DefaultConcurrencyPolicy) { 90 mixin ConcurrencyPolicy; 91 92 private { 93 void * _sock; 94 ZmqSocketType _type = type; 95 bool _closed = false; 96 bool _busy = true; 97 98 this(void * socket) { 99 _sock = socket; 100 } 101 } 102 103 ~this() { 104 close(); 105 } 106 107 void close() { 108 if (_closed) return; 109 static if(is(typeof(__onConnect))) __onClose(); 110 debug { writeln("Closed socket"); } 111 int rc = zmq_close(_sock); 112 _closed = true; 113 if (rc!=0) throw new ZmqException("Error on close"); 114 } 115 116 void connect(string endpoint) { 117 int rc = zmq_connect(_sock, endpoint.toStringz); 118 writeln("Connecting rc is %d", rc); 119 if (rc!=0) throw new ZmqException("Error on connecting"); 120 121 static if(is(typeof(__onConnect))) __onConnect(); 122 } 123 124 void disconnect(string endpoint = null) { 125 int rc = zmq_disconnect(_sock, endpoint.toStringz); 126 if (rc!=0) throw new ZmqException("Error on disconnect"); 127 } 128 129 void bind(string endpoint) { 130 int rc = zmq_bind(_sock, endpoint.toStringz); 131 if (rc!=0) throw new ZmqException("Error on binding"); 132 133 static if(is(typeof(__onConnect))) __onBind(); 134 } 135 136 void unbind(string endpoint) { 137 int rc = zmq_unbind(_sock, endpoint.toStringz); 138 if (rc!=0) throw new ZmqException("Error on unbind"); 139 } 140 141 @property bool isBusy() { return _busy; } 142 143 @property void identity(string ident) { 144 zmq_setsockopt(_sock, ZMQ_IDENTITY, ident.ptr, ident.length); 145 } 146 147 static if (type==ZmqSocketType.Pub || type==ZmqSocketType.XPub || type==ZmqSocketType.Push) { 148 mixin WritableProperties; 149 mixin WriteableSocket; 150 } else static if (type==ZmqSocketType.Sub || type==ZmqSocketType.XSub || type==ZmqSocketType.Pull) { 151 mixin ReadableProperties; 152 mixin ReadableSocket; 153 } else { 154 mixin WritableProperties; 155 mixin WriteableSocket; 156 157 mixin ReadableProperties; 158 mixin ReadableSocket; 159 } 160 161 static if (type==ZmqSocketType.Sub) { 162 void subscribe(string sub) { 163 zmq_setsockopt(_sock, ZMQ_SUBSCRIBE, sub.ptr, sub.length); 164 } 165 } 166 } 167 168 version(unittest) { 169 import std.concurrency; 170 import std.stdio; 171 import std.stdio; 172 import core.thread; 173 174 void start_reading(shared ZmqContext ctx) { 175 receive( 176 (bool start) { writeln("starting");} 177 ); 178 179 auto inctx = cast(ZmqContext)ctx; 180 auto subsock = inctx.socket!(ZmqSocketType.Sub)(); 181 subsock.subscribe(""); 182 writeln("connecting"); 183 subsock.connect("tcp://localhost:54321"); 184 send(ownerTid, true); 185 auto res = subsock.receive(); 186 writeln(res); 187 188 send(ownerTid, true); 189 } 190 191 unittest { 192 auto ctx = new ZmqContext(); 193 auto task = spawn(&start_reading, cast(shared)ctx); 194 195 auto sock = ctx.socket!(ZmqSocketType.Pub)(); 196 sock.bind("tcp://*:54321"); 197 send(task, true); 198 auto read = receiveOnly!(bool); 199 writeln("writing"); 200 Thread.getThis().sleep(1.msecs); 201 sock.send(["help", "me"]); 202 203 auto success = receiveOnly!(bool); 204 assert(success); 205 } 206 } 207