1 module dzmq.socket;
2 
3 import dzmq.concurrency_mixins;
4 
5 import deimos.zmq.zmq;
6 import std.conv: to;
7 import std.string;
8 import std.algorithm: find, remove;
9 import std.stdio;
10 import std.exception;
11 
12 enum ZmqSocketType{
13     Pair = ZMQ_PAIR,
14     Pub = ZMQ_PUB,
15     Sub = ZMQ_SUB,
16     Req = ZMQ_REQ,
17     Rep = ZMQ_REP,
18     Dealer = ZMQ_DEALER,
19     Router = ZMQ_ROUTER,
20     Pull = ZMQ_PULL,
21     Push = ZMQ_PUSH,
22     XPub = ZMQ_XPUB,
23     XSub = ZMQ_XSUB
24 }
25 
26 class ZmqTimeout: Exception {
27     this(string msg = "Operation timedout"){
28         super(msg);
29     }
30 }
31 
32 class ZmqException: ErrnoException {
33     this(string msg) {
34         super(msg);
35     }
36 }
37 
38 class ZmqContext {
39     private {
40         immutable void * _ctx;
41         bool _terminated = false;
42         this (immutable void * ctx) {
43             _ctx = ctx;
44         }
45     }
46 
47     this(int threads=1){
48         _ctx = cast(immutable(void*))zmq_init(threads);
49     }
50     
51     ~this() {
52         terminate();
53     }
54 
55     auto dup() {
56         return new ZmqContext(_ctx);
57     }
58 
59     void terminate(){
60         if (_terminated) return;
61         zmq_term(cast(void*)_ctx);
62         debug { writeln("Terminated ctx"); }
63         _terminated = true;
64     }
65 
66     auto ref socket(ZmqSocketType sockType, alias ConcurrencyPolicy = DefaultConcurrencyPolicy)(){
67         return new ZmqSocket!(sockType, ConcurrencyPolicy)(zmq_socket(cast(void*)_ctx, cast(int)sockType));
68     }
69 }
70 
71 private mixin template WritableProperties() {
72     @property bool can_write() {
73         int events;
74         size_t events_size = events.sizeof;
75         zmq_getsockopt(_sock, ZMQ_EVENTS, &events, &events_size);
76         return (events & ZMQ_POLLOUT) == ZMQ_POLLOUT;
77     }
78 }
79 
80 private mixin template ReadableProperties() {
81     @property bool can_read() {
82         int events;
83         size_t events_size = events.sizeof;
84         zmq_getsockopt(_sock, ZMQ_EVENTS, &events, &events_size);
85         return (events & ZMQ_POLLIN) == ZMQ_POLLIN;
86     }
87 }
88 
89 class ZmqSocket(ZmqSocketType type, alias ConcurrencyPolicy = DefaultConcurrencyPolicy) {
90     mixin ConcurrencyPolicy;
91 
92     private {
93         void * _sock;
94         ZmqSocketType _type = type;
95         bool _closed = false;
96         bool _busy = true;
97         
98         this(void * socket) {
99             _sock = socket;
100         }
101     }
102 
103     ~this() {
104         close();
105     }
106 
107     void close() {
108         if (_closed) return;
109         static if(is(typeof(__onConnect))) __onClose();
110         debug { writeln("Closed socket"); }
111         int rc = zmq_close(_sock);
112         _closed = true;
113         if (rc!=0) throw new ZmqException("Error on close");
114     }
115 
116     void connect(string endpoint) {
117         int rc = zmq_connect(_sock, endpoint.toStringz);
118         writeln("Connecting rc is %d", rc);
119         if (rc!=0) throw new ZmqException("Error on connecting");
120 
121         static if(is(typeof(__onConnect))) __onConnect();
122     }
123 
124     void disconnect(string endpoint = null) {
125         int rc = zmq_disconnect(_sock, endpoint.toStringz);
126         if (rc!=0) throw new ZmqException("Error on disconnect");
127     }
128 
129     void bind(string endpoint) {
130         int rc = zmq_bind(_sock, endpoint.toStringz);
131         if (rc!=0) throw new ZmqException("Error on binding");
132 
133         static if(is(typeof(__onConnect))) __onBind();
134     }
135 
136     void unbind(string endpoint) {
137         int rc = zmq_unbind(_sock, endpoint.toStringz);
138         if (rc!=0) throw new ZmqException("Error on unbind");
139     }
140 
141     @property bool isBusy() { return _busy; }
142 
143     @property void identity(string ident) {
144         zmq_setsockopt(_sock, ZMQ_IDENTITY, ident.ptr, ident.length);
145     }
146 
147     static if (type==ZmqSocketType.Pub || type==ZmqSocketType.XPub || type==ZmqSocketType.Push) {
148         mixin WritableProperties;
149         mixin WriteableSocket;
150     } else static if (type==ZmqSocketType.Sub || type==ZmqSocketType.XSub || type==ZmqSocketType.Pull) {
151         mixin ReadableProperties;
152         mixin ReadableSocket;
153     } else {
154         mixin WritableProperties;
155         mixin WriteableSocket;
156 
157         mixin ReadableProperties;
158         mixin ReadableSocket;
159     }
160 
161     static if (type==ZmqSocketType.Sub) {
162         void subscribe(string sub) {
163             zmq_setsockopt(_sock, ZMQ_SUBSCRIBE, sub.ptr, sub.length);
164         }
165     }
166 }
167         
168 version(unittest) {
169     import std.concurrency;
170     import std.stdio;
171     import std.stdio;
172     import core.thread;
173 
174     void start_reading(shared ZmqContext ctx) {
175         receive(
176                 (bool start) { writeln("starting");}
177                );
178 
179         auto inctx = cast(ZmqContext)ctx;
180         auto subsock = inctx.socket!(ZmqSocketType.Sub)();
181         subsock.subscribe("");
182         writeln("connecting");
183         subsock.connect("tcp://localhost:54321");
184         send(ownerTid, true);
185         auto res = subsock.receive();
186         writeln(res);
187 
188         send(ownerTid, true);
189     }
190 
191     unittest {
192         auto ctx = new ZmqContext();
193         auto task = spawn(&start_reading, cast(shared)ctx);
194 
195         auto sock = ctx.socket!(ZmqSocketType.Pub)();
196         sock.bind("tcp://*:54321");
197         send(task, true);
198         auto read = receiveOnly!(bool);
199         writeln("writing");
200         Thread.getThis().sleep(1.msecs);
201         sock.send(["help", "me"]);
202 
203         auto success = receiveOnly!(bool);
204         assert(success);
205     }
206 }
207