1 module dzmq.socket;
2 
3 import dzmq.concurrency_mixins;
4 
5 import deimos.zmq.zmq;
6 import std.conv: to;
7 import std.string;
8 import std.algorithm: find, remove;
9 import std.stdio;
10 import std.exception;
11 
12 enum ZmqSocketType{
13     Pair = ZMQ_PAIR,
14     Pub = ZMQ_PUB,
15     Sub = ZMQ_SUB,
16     Req = ZMQ_REQ,
17     Rep = ZMQ_REP,
18     Dealer = ZMQ_DEALER,
19     Router = ZMQ_ROUTER,
20     Pull = ZMQ_PULL,
21     Push = ZMQ_PUSH,
22     XPub = ZMQ_XPUB,
23     XSub = ZMQ_XSUB
24 }
25 
26 class ZmqTimeout: Exception {
27     this(string msg = "Operation timedout"){
28         super(msg);
29     }
30 }
31 
32 class ZmqException: ErrnoException {
33     this(string msg) {
34         super(msg);
35     }
36 }
37 
38 class ZmqContext {
39     private {
40         immutable void * _ctx;
41         bool _terminated = false;
42         this (immutable void * ctx) {
43             _ctx = ctx;
44         }
45     }
46 
47     this(int threads=1){
48         _ctx = cast(immutable(void*))zmq_init(threads);
49     }
50     
51     ~this() {
52         terminate();
53     }
54 
55     auto dup() {
56         return new ZmqContext(_ctx);
57     }
58 
59     void terminate(){
60         if (_terminated) return;
61         zmq_term(cast(void*)_ctx);
62         debug { writeln("Terminated ctx"); }
63         _terminated = true;
64     }
65 
66     auto ref socket(ZmqSocketType sockType, alias ConcurrencyPolicy = DefaultConcurrencyPolicy)(){
67         return new ZmqSocket!(sockType, ConcurrencyPolicy)(zmq_socket(cast(void*)_ctx, cast(int)sockType));
68     }
69 }
70 
71 private mixin template WritableProperties() {
72     @property bool can_write() {
73         int events;
74         size_t events_size = events.sizeof;
75         zmq_getsockopt(_sock, ZMQ_EVENTS, &events, &events_size);
76         return (events & ZMQ_POLLOUT) == ZMQ_POLLOUT;
77     }
78 }
79 
80 private mixin template ReadableProperties() {
81     @property bool can_read() {
82         int events;
83         size_t events_size = events.sizeof;
84         zmq_getsockopt(_sock, ZMQ_EVENTS, &events, &events_size);
85         return (events & ZMQ_POLLIN) == ZMQ_POLLIN;
86     }
87 }
88 
89 class ZmqSocket(ZmqSocketType type, alias ConcurrencyPolicy = DefaultConcurrencyPolicy) {
90     mixin ConcurrencyPolicy;
91 
92     private {
93         void * _sock;
94         ZmqSocketType _type = type;
95         bool _closed = false;
96         bool _busy = true;
97         
98         this(void * socket) {
99             _sock = socket;
100         }
101     }
102 
103     ~this() {
104         close();
105     }
106 
107     void close() {
108         if (_closed) return;
109         static if(is(typeof(__onConnect))) __onClose();
110         debug { writeln("Closed socket"); }
111         int rc = zmq_close(_sock);
112         if (rc!=0) throw new ZmqException("Error on close");
113     }
114 
115     void connect(string endpoint) {
116         int rc = zmq_connect(_sock, endpoint.toStringz);
117         writeln("Connecting rc is %d", rc);
118         if (rc!=0) throw new ZmqException("Error on connecting");
119 
120         static if(is(typeof(__onConnect))) __onConnect();
121     }
122 
123     void disconnect(string endpoint = null) {
124         int rc = zmq_disconnect(_sock, endpoint.toStringz);
125         if (rc!=0) throw new ZmqException("Error on disconnect");
126     }
127 
128     void bind(string endpoint) {
129         int rc = zmq_bind(_sock, endpoint.toStringz);
130         if (rc!=0) throw new ZmqException("Error on binding");
131 
132         static if(is(typeof(__onConnect))) __onBind();
133     }
134 
135     void unbind(string endpoint) {
136         int rc = zmq_unbind(_sock, endpoint.toStringz);
137         if (rc!=0) throw new ZmqException("Error on unbind");
138     }
139 
140     @property bool isBusy() { return _busy; }
141 
142     @property void identity(string ident) {
143         zmq_setsockopt(_sock, ZMQ_IDENTITY, ident.ptr, ident.length);
144     }
145 
146     static if (type==ZmqSocketType.Pub || type==ZmqSocketType.XPub || type==ZmqSocketType.Push) {
147         mixin WritableProperties;
148         mixin WriteableSocket;
149     } else static if (type==ZmqSocketType.Sub || type==ZmqSocketType.XSub || type==ZmqSocketType.Pull) {
150         mixin ReadableProperties;
151         mixin ReadableSocket;
152     } else {
153         mixin WritableProperties;
154         mixin WriteableSocket;
155 
156         mixin ReadableProperties;
157         mixin ReadableSocket;
158     }
159 
160     static if (type==ZmqSocketType.Sub) {
161         void subscribe(string sub) {
162             zmq_setsockopt(_sock, ZMQ_SUBSCRIBE, sub.ptr, sub.length);
163         }
164     }
165 }
166         
167 version(unittest) {
168     import std.concurrency;
169     import std.stdio;
170     import std.stdio;
171     import core.thread;
172 
173     void start_reading(shared ZmqContext ctx) {
174         receive(
175                 (bool start) { writeln("starting");}
176                );
177 
178         auto inctx = cast(ZmqContext)ctx;
179         auto subsock = inctx.socket!(ZmqSocketType.Sub)();
180         subsock.subscribe("");
181         writeln("connecting");
182         subsock.connect("tcp://localhost:54321");
183         send(ownerTid, true);
184         auto res = subsock.receive();
185         writeln(res);
186 
187         send(ownerTid, true);
188     }
189 
190     unittest {
191         auto ctx = new ZmqContext();
192         auto task = spawn(&start_reading, cast(shared)ctx);
193 
194         auto sock = ctx.socket!(ZmqSocketType.Pub)();
195         sock.bind("tcp://*:54321");
196         send(task, true);
197         auto read = receiveOnly!(bool);
198         writeln("writing");
199         Thread.getThis().sleep(1.msecs);
200         sock.send(["help", "me"]);
201 
202         auto success = receiveOnly!(bool);
203         assert(success);
204     }
205 }
206