1 /**
2 Copyright: 2018 Mark Fisher
3 
4 License:
5 Permission is hereby granted, free of charge, to any person obtaining a copy of
6 this software and associated documentation files (the "Software"), to deal in
7 the Software without restriction, including without limitation the rights to
8 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
9 of the Software, and to permit persons to whom the Software is furnished to do
10 so, subject to the following conditions:
11 
12 The above copyright notice and this permission notice shall be included in all
13 copies or substantial portions of the Software.
14 
15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 SOFTWARE.
22 **/
23 module dxx.util.notify;
24 
25 private import std.algorithm;
26 private import std.parallelism;
27 
28 private import std.experimental.logger;
29 
30 /++
31 Simple notification service.
32 
33 An object that inherits NotificationSource maintains a vector of
34 NotificationListener's.
35 
36 At any time the listeners callback method handleNotification may be invoked.
37 If the client implements the interface ASyncNotificationListener then the
38 invocations should be in parallel.
39 
40 
41 ++/
42 
43 interface NotificationListener {
44   shared void handleNotification(void* t);
45 }
46 
47 interface ASyncNotificationListener : NotificationListener {
48 }
49 
50 interface NotificationSource {
51   shared void addNotificationListener(shared(NotificationListener) n);
52   shared void removeNotificationListener(shared(NotificationListener) n);
53 }
54 
55 abstract class NotificationListenerBase(T) : NotificationListener {
56   abstract shared void handle(T* t);
57   shared void handleNotification(void* t) {
58     this.handle(cast(T*)t);
59   }
60 
61 }
62 
63 /++
64 
65 Synchronized notification handler. Extend this class to create an object that can send
66 notifications to a group of listeners.
67 
68 ++/
69 class SyncNotificationSource : NotificationSource {
70   shared shared(NotificationListener)[] _listenersAsync;
71   shared shared(NotificationListener)[] _listeners;
72 
73   shared void addNotificationListener(shared(NotificationListener) n) {
74     if(cast(shared(ASyncNotificationListener)) n is null) {
75         debug(Notify) {
76             info(typeid(this)," : addNotificationListener sync ",_listeners.length," ");
77         }
78         _listeners ~= n;
79     } else {
80         debug(Notify) {
81             info(typeid(this)," : addNotificationListener async ",_listeners.length," ");
82         }
83         _listenersAsync ~= n;
84     }
85   }
86 
87   shared void removeNotificationListener(shared(NotificationListener) n) {
88     if(cast(shared(ASyncNotificationListener)) n is null) {
89         debug(Notify) {
90             info(typeid(this)," : removeNotificationListener ",_listeners.length);
91         }
92         _listeners = _listeners.remove(_listeners.countUntil(n));
93     } else {
94         debug(Notify) {
95             info(typeid(this)," : removeNotificationListener async",_listenersAsync.length);
96         }
97         _listenersAsync = _listenersAsync.remove(_listenersAsync.countUntil(n));
98     }
99   }
100 
101 //    shared(Notifier) notifier;
102 //    alias notifier this;
103 
104     @property
105     nothrow shared ref
106     auto listeners() {
107       return _listeners;
108     }
109 
110     @property
111     nothrow shared ref inout
112     auto listenersAsync() {
113       return _listenersAsync;
114     }
115 
116     protected
117     nothrow
118     void _send(T)(T* t) {
119       (cast(shared)this).send(t);
120     }
121     nothrow shared
122     void send(T)(T* t) {
123     assert(t);
124     debug(Notify) {
125         try {
126             info("SyncNotificationSource : send ",listeners.length);
127         } catch(Exception e) {
128             //error(e.message);
129         }
130     }
131     //shared(NotificationListener)[] ar = listeners.dup;
132     auto ar = _listeners.dup;
133     foreach(x;ar) {
134         try {
135             debug(Notify) {
136                 info("sync notification");
137             }
138             x.handleNotification(cast(void*)t);
139         } catch(Exception e) {
140             try {
141                 error(e.message);
142             } catch(Exception) {
143             }
144         }
145     }
146     auto ar2 = _listenersAsync.dup;
147     try {
148         foreach(x;ar2.parallel) {
149             //assert(x);
150             debug(Notify) {
151                 sharedLog.info("async notification ",typeid(x));
152             }
153             x.handleNotification(cast(void*)t);
154         }
155     } catch(Exception e) {
156         try {
157             sharedLog.error(e.message);
158         } catch(Exception) {
159         }
160     }
161     }
162 }
163 
164 
165 unittest {
166   import std.stdio;
167     shared(bool) done = false;
168     class TestNotificationListener : NotificationListener {
169         override shared void handleNotification(void* t) {
170             writefln("handleNotification");
171             done = true;
172         }
173     }
174     auto n = new shared(SyncNotificationSource);
175     shared(NotificationListener) l = new shared(TestNotificationListener);
176     n.addNotificationListener(l);
177     string s = "";
178     n.send!string(&s);
179     assert(done is true);
180 
181     done = false;
182     n.removeNotificationListener(l);
183     n.send!string(&s);
184     assert(done is false);
185 }
186 
187 unittest {
188     import core.thread;
189     import std.stdio;
190 
191     shared(bool) done = false;
192     class TestNotificationListener : NotificationListener,ASyncNotificationListener {
193         override shared void handleNotification(void* t) {
194             writefln("handleNotification");
195             done = true;
196         }
197     }
198     auto n = new shared(SyncNotificationSource);
199     shared(NotificationListener) l = new shared(TestNotificationListener);
200     n.addNotificationListener(l);
201     string s = "";
202     n.send!string(&s);
203     Thread.sleep(dur!("msecs")( 500 ));
204     assert(done is true);
205 
206     done = false;
207     n.removeNotificationListener(l);
208     n.send!string(&s);
209     Thread.sleep(dur!("msecs")( 500 ));
210     assert(done is false);
211 }
212 
213 unittest {
214   import std.stdio;
215     class TestHandler {
216         bool done = false;
217     }
218     class TestNotificationListener : NotificationListener {
219         override shared void handleNotification(void* t) {
220             writefln("handleNotification");
221             assert(t);
222             auto a = cast(TestHandler*)t;
223             assert(a);
224             a.done = true;
225         }
226     }
227     auto n = new shared(SyncNotificationSource);
228     shared(NotificationListener) l = new shared(TestNotificationListener);
229     TestHandler testHandler = new TestHandler;
230 
231     n.addNotificationListener(l);
232     //string s = "";
233     n.send(&testHandler);
234     assert(testHandler.done is true);
235 
236     testHandler.done = false;
237     n.removeNotificationListener(l);
238     n.send(&testHandler);
239     assert(testHandler.done is false);
240 }