1 /**
2 Copyright 2018 Mark Fisher
3 
4 Permission is hereby granted, free of charge, to any person obtaining a copy of
5 this software and associated documentation files (the "Software"), to deal in
6 the Software without restriction, including without limitation the rights to
7 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
8 of the Software, and to permit persons to whom the Software is furnished to do
9 so, subject to the following conditions:
10 
11 The above copyright notice and this permission notice shall be included in all
12 copies or substantial portions of the Software.
13 
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 SOFTWARE.
21 **/
22 module dxx.util.notify;
23 
24 private import std.algorithm;
25 private import std.parallelism;
26 
27 private import std.experimental.logger;
28 
29 /++
30 Simple notification service.
31 
32 An object that inherits NotificationSource maintains a vector of
33 NotificationListener's.
34 
35 At any time the listeners callback method handleNotification may be invoked.
36 If the client implements the interface ASyncNotificationListener then the
37 invocations should be in parallel.
38 
39 
40 ++/
41 
42 interface NotificationListener {
43   shared void handleNotification(void* t);
44 }
45 
46 interface ASyncNotificationListener : NotificationListener {
47 }
48 
49 interface NotificationSource {
50   shared void addNotificationListener(shared(NotificationListener) n);
51   shared void removeNotificationListener(shared(NotificationListener) n);
52 }
53 
54 abstract class NotificationListenerBase(T) : NotificationListener {
55   abstract shared void handle(T* t);
56   shared void handleNotification(void* t) {
57     this.handle(cast(T*)t);
58   }
59 
60 }
61 
62 /++
63 
64 Synchronized notification handler. Extend this class to create an object that can send
65 notifications to a group of listeners.
66 
67 ++/
68 class SyncNotificationSource : NotificationSource {
69   shared shared(NotificationListener)[] _listenersAsync;
70   shared shared(NotificationListener)[] _listeners;
71 
72   shared void addNotificationListener(shared(NotificationListener) n) {
73     if(cast(shared(ASyncNotificationListener)) n is null) {
74         debug(Notify) {
75             info(typeid(this)," : addNotificationListener sync ",_listeners.length," ");
76         }
77         _listeners ~= n;
78     } else {
79         debug(Notify) {
80             info(typeid(this)," : addNotificationListener async ",_listeners.length," ");
81         }
82         _listenersAsync ~= n;
83     }
84   }
85 
86   shared void removeNotificationListener(shared(NotificationListener) n) {
87     if(cast(shared(ASyncNotificationListener)) n is null) {
88         debug(Notify) {
89             info(typeid(this)," : removeNotificationListener ",_listeners.length);
90         }
91         _listeners = _listeners.remove(_listeners.countUntil(n));
92     } else {
93         debug(Notify) {
94             info(typeid(this)," : removeNotificationListener async",_listenersAsync.length);
95         }
96         _listenersAsync = _listenersAsync.remove(_listenersAsync.countUntil(n));
97     }
98   }
99 
100 //    shared(Notifier) notifier;
101 //    alias notifier this;
102 
103     @property
104     nothrow shared ref
105     auto listeners() {
106       return _listeners;
107     }
108 
109     @property
110     nothrow shared ref inout
111     auto listenersAsync() {
112       return _listenersAsync;
113     }
114     nothrow
115     void _send(T)(T* t) {
116       (cast(shared)this).send(t);
117     }
118     nothrow shared
119     void send(T)(T* t) {
120     assert(t);
121     debug(Notify) {
122         try {
123             info("SyncNotificationSource : send ",listeners.length);
124         } catch(Exception e) {
125             //error(e.message);
126         }
127     }
128     //shared(NotificationListener)[] ar = listeners.dup;
129     auto ar = _listeners.dup;
130     foreach(x;ar) {
131         try {
132             debug(Notify) {
133                 info("sync notification");
134             }
135             x.handleNotification(cast(void*)t);
136         } catch(Exception e) {
137             try {
138                 error(e.message);
139             } catch(Exception) {
140             }
141         }
142     }
143     auto ar2 = _listenersAsync.dup;
144     try {
145         foreach(x;ar2.parallel) {
146             //assert(x);
147             debug(Notify) {
148                 sharedLog.info("async notification ",typeid(x));
149             }
150             x.handleNotification(cast(void*)t);
151         }
152     } catch(Exception e) {
153         try {
154             sharedLog.error(e.message);
155         } catch(Exception) {
156         }
157     }
158     }
159 }
160 
161 unittest {
162     shared(bool) done = false;
163     class TestNotificationListener : NotificationListener {
164         override shared void handleNotification(void* t) {
165             done = true;
166         }
167     }
168     auto n = new shared(SyncNotificationSource);
169     shared(NotificationListener) l = new shared(TestNotificationListener);
170     n.addNotificationListener(l);
171     string s = "";
172     n.send!string(&s);
173     assert(done is true);
174 
175     done = false;
176     n.removeNotificationListener(l);
177     n.send!string(&s);
178     assert(done is false);
179 }
180 
181 unittest {
182     import core.thread;
183 
184     shared(bool) done = false;
185     class TestNotificationListener : NotificationListener,ASyncNotificationListener {
186         override shared void handleNotification(void* t) {
187             done = true;
188         }
189     }
190     auto n = new shared(SyncNotificationSource);
191     shared(NotificationListener) l = new shared(TestNotificationListener);
192     n.addNotificationListener(l);
193     string s = "";
194     n.send!string(&s);
195     Thread.sleep(dur!("msecs")( 500 ));
196     assert(done is true);
197 
198     done = false;
199     n.removeNotificationListener(l);
200     n.send!string(&s);
201     Thread.sleep(dur!("msecs")( 500 ));
202     assert(done is false);
203 }
204 
205 unittest {
206     class TestHandler {
207         bool done = false;
208     }
209     class TestNotificationListener : NotificationListener {
210         override shared void handleNotification(void* t) {
211             assert(t);
212             auto a = cast(TestHandler*)t;
213             assert(a);
214             a.done = true;
215         }
216     }
217     auto n = new shared(SyncNotificationSource);
218     shared(NotificationListener) l = new shared(TestNotificationListener);
219     TestHandler testHandler = new TestHandler;
220 
221     n.addNotificationListener(l);
222     //string s = "";
223     n.send(&testHandler);
224     assert(testHandler.done is true);
225 
226     testHandler.done = false;
227     n.removeNotificationListener(l);
228     n.send(&testHandler);
229     assert(testHandler.done is false);
230 }