1 /**
2 Copyright 2018 Mark Fisher
3 
4 Permission is hereby granted, free of charge, to any person obtaining a copy of
5 this software and associated documentation files (the "Software"), to deal in 
6 the Software without restriction, including without limitation the rights to 
7 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 
8 of the Software, and to permit persons to whom the Software is furnished to do 
9 so, subject to the following conditions:
10 
11 The above copyright notice and this permission notice shall be included in all 
12 copies or substantial portions of the Software.
13 
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 
17 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 
19 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 
20 SOFTWARE.
21 **/
22 module dxx.sys.spawn;
23 
24 /**
25 taken from https://github.com/buggins/dlangide/blob/master/src/dlangide/builders/extprocess.d
26 module dlangide.builders.extprocess;
27 
28 **/
29 
30 private import std.process;
31 private import std.stdio;
32 private import std.utf;
33 private import std.stdio;
34 private import core.thread;
35 private import core.sync.mutex;
36 
37 private import dxx.util;
38 
39 mixin __Text;
40 
41 /// interface to forward process output to
42 interface TextWriter {
43     /// log lines
44     void writeText(dstring text);
45 }
46 
47 /// interface to read text
48 interface TextReader {
49     /// log lines
50     dstring readText();
51 }
52 
53 /// protected text storage box to read and write text from different threads
54 class ProtectedTextStorage : TextReader, TextWriter {
55 
56     private Mutex _mutex;
57     private shared bool _closed;
58     private dchar[] _buffer;
59 
60     this() {
61         _mutex = new Mutex();
62     }
63 
64     @property bool closed() { return _closed; }
65 
66     void close() {
67         if (_closed)
68             return;
69         _closed = true;
70         _buffer = null;
71     }
72 
73     /// log lines
74     override void writeText(dstring text) {
75         if (!_closed) {
76             // if not closed
77             _mutex.lock();
78             scope(exit) _mutex.unlock();
79             // append text
80             _buffer ~= text;
81         }
82     }
83 
84     /// log lines
85     override dstring readText() {
86         if (!_closed) {
87             // if not closed
88             _mutex.lock();
89             scope(exit) _mutex.unlock();
90             if (!_buffer.length)
91                 return null;
92             dstring res = _buffer.dup;
93             _buffer = null;
94             return res;
95         } else {
96             // reading from closed
97             return null;
98         }
99     }
100 }
101 
102 enum ExternalProcessState : uint {
103     /// not initialized
104     None,
105     /// running
106     Running,
107     /// stop is requested
108     Stopping,
109     /// stopped
110     Stopped,
111     /// error occured, e.g. cannot run process
112     Error
113 }
114 
115 /// base class for text reading from std.stdio.File in background thread
116 class BackgroundReaderBase : Thread {
117     private std.stdio.File _file;
118     private shared bool _finished;
119     private ubyte[1] _byteBuffer;
120     private ubyte[] _bytes;
121     dchar[] _textbuffer;
122     private int _len;
123     private bool _utfError;
124 
125     this(std.stdio.File f) {
126         super(&run);
127         assert(f.isOpen());
128         _file = f;
129         _len = 0;
130         _finished = false;
131     }
132 
133     @property bool finished() {
134         return _finished;
135     }
136 
137     ubyte prevchar;
138     void addByte(ubyte data) {
139         if (_bytes.length < _len + 1)
140             _bytes.length = _bytes.length ? _bytes.length * 2 : 1024;
141         bool eolchar = (data == '\r' || data == '\n');
142         bool preveol = (prevchar == '\r' || prevchar == '\n');
143         _bytes[_len++] = data;
144         if (data == '\n')
145             flush();
146         //if ((eolchar && !preveol) || (!eolchar && preveol) || data == '\n') {
147         //    //Log.d("Flushing for prevChar=", prevchar, " newChar=", data);
148         //    flush();
149         //}
150         prevchar = data;
151     }
152     void flush() {
153         if (!_len)
154             return;
155         if (_textbuffer.length < _len)
156             _textbuffer.length = _len + 256;
157         size_t count = 0;
158         for(size_t i = 0; i < _len;) {
159             dchar ch = 0;
160             if (_utfError) {
161                 ch = _bytes[i++];
162             } else {
163                 try {
164                     ch = decode(cast(string)_bytes, i);
165                 } catch (UTFException e) {
166                     _utfError = true;
167                     ch = _bytes[i++];
168                     MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_NONUNICODE_PROC_OUTPUT));
169                 }
170             }
171             _textbuffer[count++] = ch;
172         }
173         _len = 0;
174 
175         if (!count)
176             return;
177 
178         // fix line endings - must be '\n'
179         count = convertLineEndings(_textbuffer[0..count]);
180 
181         // data is ready to send
182         if (count)
183             sendResult(_textbuffer[0..count].dup);
184     }
185     /// inplace convert line endings to unix format (\n)
186     size_t convertLineEndings(dchar[] text) {
187         size_t src = 0;
188         size_t dst = 0;
189         for(;src < text.length;) {
190             dchar ch = text[src++];
191             dchar nextch = src < text.length ? text[src] : 0;
192             if (ch == '\n') {
193                 if (nextch == '\r')
194                     src++;
195                 text[dst++] = '\n';
196             } else if (ch == '\r') {
197                 if (nextch == '\n')
198                     src++;
199                 text[dst++] = '\n';
200             } else {
201                 text[dst++] = ch;
202             }
203         }
204         return dst;
205     }
206     protected void sendResult(dstring text) {
207         // override to deal with ready data
208     }
209 
210     protected void handleFinish() {
211         // override to do something when thread is finishing
212     }
213 
214     private void run() {
215         //Log.d("BackgroundReaderBase run() enter");
216         // read file by bytes
217         try {
218             version (Windows) {
219                 import core.sys.windows.windows;
220                 // separate version for windows as workaround for hanging rawRead
221                 HANDLE h = _file.windowsHandle;
222                 DWORD bytesRead = 0;
223                 DWORD err;
224                 for (;;) {
225                     BOOL res = ReadFile(h, _byteBuffer.ptr, 1, &bytesRead, null);
226                     if (res) {
227                         if (bytesRead == 1)
228                             addByte(_byteBuffer[0]);
229                     } else {
230                         err = GetLastError();
231                         if (err == ERROR_MORE_DATA) {
232                             if (bytesRead == 1)
233                                 addByte(_byteBuffer[0]);
234                             continue;
235                         }
236                         //if (err == ERROR_BROKEN_PIPE || err = ERROR_INVALID_HANDLE)
237                         break;
238                     }
239                 }
240             } else {
241                 for (;;) {
242                     //Log.d("BackgroundReaderBase run() reading file");
243                     if (_file.eof)
244                         break;
245                     ubyte[] r = _file.rawRead(_byteBuffer);
246                     if (!r.length)
247                         break;
248                     //Log.d("BackgroundReaderBase run() read byte: ", r[0]);
249                     addByte(r[0]);
250                 }
251             }
252             _file.close();
253             flush();
254             //Log.d("BackgroundReaderBase run() closing file");
255             //Log.d("BackgroundReaderBase run() file closed");
256         } catch (Exception e) {
257             //Log.e("Exception occured while reading stream: ", e);
258         }
259         handleFinish();
260         _finished = true;
261         //Log.d("BackgroundReaderBase run() exit");
262     }
263 
264     void waitForFinish() {
265         static if (false) {
266             while (isRunning && !_finished)
267                 Thread.sleep( dur!("msecs")( 10 ) );
268         } else {
269             join(false);
270         }
271     }
272 
273 }
274 
275 /// reader which sends output text to TextWriter (warning: call will be made from background thread)
276 class BackgroundReader : BackgroundReaderBase {
277     protected TextWriter _destination;
278     this(std.stdio.File f, TextWriter destination) {
279         super(f);
280         assert(destination);
281         _destination = destination;
282     }
283     override protected void sendResult(dstring text) {
284         // override to deal with ready data
285         _destination.writeText(text);
286     }
287     override protected void handleFinish() {
288         // remove link to destination to help GC
289         _destination = null;
290     }
291 }
292 
293 /// runs external process, catches output, allows to stop
294 class ExternalProcess {
295 
296     protected char[][] _args;
297     protected char[] _workDir;
298     protected char[] _program;
299     protected string[string] _env;
300     protected TextWriter _stdout;
301     protected TextWriter _stderr;
302     protected BackgroundReader _stdoutReader;
303     protected BackgroundReader _stderrReader;
304     protected ProcessPipes _pipes;
305     protected ExternalProcessState _state;
306 
307     protected int _result;
308 
309     @property ExternalProcessState state() { return _state; }
310     /// returns process result for stopped process
311     @property int result() { return _result; }
312 
313     this() {
314     }
315 
316     ExternalProcessState run(string program, string[]args, string dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) {
317         char[][] arguments;
318         foreach(a; args)
319             arguments ~= a.dup;
320         return run(program.dup, arguments, dir.dup, stdoutTarget, stderrTarget);
321     }
322     ExternalProcessState run(char[] program, char[][]args, char[] dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) {
323         MsgLog.trace(MsgText!(DXXConfig.messages.MSG_PROC_RUN)(program,args));
324         _state = ExternalProcessState.None;
325         _program = findExecutablePath(cast(string)program).dup;
326         if (!_program) {
327             _state = ExternalProcessState.Error;
328             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_NOT_FOUND)(program));
329             return _state;
330         }
331         _args = args;
332         _workDir = dir;
333         _stdout = stdoutTarget;
334         _stderr = stderrTarget;
335         _result = 0;
336         assert(_stdout);
337         Redirect redirect;
338         char[][] params;
339         params ~= _program;
340         params ~= _args;
341         if (!_stderr)
342             redirect = Redirect.stdout | Redirect.stderrToStdout | Redirect.stdin;
343         else
344             redirect = Redirect.all;
345 //        sharedLog.info("Trying to run program ", _program, " with args ", _args);
346 //        MsgLog.trace(MsgText!(DXXConfig.messages.MSG_PROC_RUN)(program,args));
347         try {
348             _pipes = pipeProcess(params, redirect, _env, std.process.Config.suppressConsole, _workDir);
349             _state = ExternalProcessState.Running;
350             // start readers
351             _stdoutReader = new BackgroundReader(_pipes.stdout, _stdout);
352             _stdoutReader.start();
353             if (_stderr) {
354                 _stderrReader = new BackgroundReader(_pipes.stderr, _stderr);
355                 _stderrReader.start();
356             }
357         } catch (ProcessException e) {
358             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_RUN)(program,e));
359         } catch (std.stdio.StdioException e) {
360             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_REDIR)(program,e));
361         } catch (Throwable e) {
362             //sharedLog.error("Exception while trying to run program ", _program, " ", e);
363             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_UNKNOWN)(program,e));
364         }
365         return _state;
366     }
367 
368     protected void waitForReadingCompletion() {
369         try {
370             if (_stdoutReader && !_stdoutReader.finished) {
371                 _pipes.stdout.detach();
372                 //Log.d("waitForReadingCompletion - waiting for stdout");
373                 _stdoutReader.waitForFinish();
374                 //Log.d("waitForReadingCompletion - joined stdout");
375             }
376             _stdoutReader = null;
377         } catch (Exception e) {
378             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_WAITING_STDOUT)(_program,e));
379         }
380         try {
381             if (_stderrReader && !_stderrReader.finished) {
382                 _pipes.stderr.detach();
383                 //Log.d("waitForReadingCompletion - waiting for stderr");
384                 _stderrReader.waitForFinish();
385                 _stderrReader = null;
386                 //Log.d("waitForReadingCompletion - joined stderr");
387             }
388         } catch (Exception e) {
389             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_WAITING_STDERR)(_program,e));
390         }
391         //Log.d("waitForReadingCompletion - done");
392     }
393 
394     /// polls all available output from process streams
395     ExternalProcessState poll() {
396         //Log.d("ExternalProcess.poll state = ", _state);
397         bool res = true;
398         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
399             return _state;
400         // check for process finishing
401         try {
402             auto pstate = std.process.tryWait(_pipes.pid);
403             if (pstate.terminated) {
404                 _state = ExternalProcessState.Stopped;
405                 _result = pstate.status;
406                 waitForReadingCompletion();
407             }
408         } catch (Exception e) {
409             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_WAITING)(_program));
410             _state = ExternalProcessState.Error;
411         }
412         return _state;
413     }
414 
415     /// waits until termination
416     ExternalProcessState wait() {
417         MsgLog.info(MsgText!(DXXConfig.messages.MSG_PROC_WAITING));
418         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
419             return _state;
420         try {
421             _result = std.process.wait(_pipes.pid);
422             _state = ExternalProcessState.Stopped;
423             MsgLog.trace(MsgText!(DXXConfig.messages.MSG_PROC_READWAITING));
424             waitForReadingCompletion();
425         } catch (Exception e) {
426             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_UNKNOWN));
427             _state = ExternalProcessState.Error;
428         }
429         return _state;
430     }
431 
432     /// request process stop
433     ExternalProcessState kill() {
434         MsgLog.info(MsgText!(DXXConfig.messages.MSG_PROC_KILL));
435         
436         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
437             return _state;
438         if (_state == ExternalProcessState.Running) {
439             std.process.kill(_pipes.pid);
440             _state = ExternalProcessState.Stopping;
441         }
442         return _state;
443     }
444 
445     bool write(string data) {
446         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) {
447             return false;
448         } else {
449             //Log.d("writing ", data.length, " characters to stdin");
450             _pipes.stdin.write("", data);
451             _pipes.stdin.flush();
452             //_pipes.stdin.close();
453             return true;
454         }
455     }
456 }
457 private import std.algorithm;
458 private import std.process;
459 private import std.path;
460 private import std.file;
461 private import std.utf;
462 
463 /// for executable name w/o path, find absolute path to executable
464 string findExecutablePath(string executableName) {
465     import std..string : split;
466     version (Windows) {
467         if (!executableName.endsWith(".exe"))
468             executableName = executableName ~ ".exe";
469     }
470     string currentExeDir = dirName(thisExePath());
471     string inCurrentExeDir = absolutePath(buildNormalizedPath(currentExeDir, executableName));
472     if (exists(inCurrentExeDir) && isFile(inCurrentExeDir))
473         return inCurrentExeDir; // found in current directory
474     string pathVariable = environment.get("PATH");
475     if (!pathVariable)
476         return null;
477     string[] paths = pathVariable.split(pathSeparator);
478     foreach(path; paths) {
479         string pathname = absolutePath(buildNormalizedPath(path, executableName));
480         if (exists(pathname) && isFile(pathname))
481             return pathname;
482     }
483     return null;
484 }
485