1 module dxx.sys.spawn;
2 
3 /**
4 taken from https://github.com/buggins/dlangide/blob/master/src/dlangide/builders/extprocess.d
5 module dlangide.builders.extprocess;
6 
7 **/
8 
9 private import std.process;
10 private import std.stdio;
11 private import std.utf;
12 private import std.stdio;
13 private import core.thread;
14 private import core.sync.mutex;
15 
16 private import dxx.app;
17 
18 mixin __Text;
19 
20 /// interface to forward process output to
21 interface TextWriter {
22     /// log lines
23     void writeText(dstring text);
24 }
25 
26 /// interface to read text
27 interface TextReader {
28     /// log lines
29     dstring readText();
30 }
31 
32 /// protected text storage box to read and write text from different threads
33 class ProtectedTextStorage : TextReader, TextWriter {
34 
35     private Mutex _mutex;
36     private shared bool _closed;
37     private dchar[] _buffer;
38 
39     this() {
40         _mutex = new Mutex();
41     }
42 
43     @property bool closed() { return _closed; }
44 
45     void close() {
46         if (_closed)
47             return;
48         _closed = true;
49         _buffer = null;
50     }
51 
52     /// log lines
53     override void writeText(dstring text) {
54         if (!_closed) {
55             // if not closed
56             _mutex.lock();
57             scope(exit) _mutex.unlock();
58             // append text
59             _buffer ~= text;
60         }
61     }
62 
63     /// log lines
64     override dstring readText() {
65         if (!_closed) {
66             // if not closed
67             _mutex.lock();
68             scope(exit) _mutex.unlock();
69             if (!_buffer.length)
70                 return null;
71             dstring res = _buffer.dup;
72             _buffer = null;
73             return res;
74         } else {
75             // reading from closed
76             return null;
77         }
78     }
79 }
80 
81 enum ExternalProcessState : uint {
82     /// not initialized
83     None,
84     /// running
85     Running,
86     /// stop is requested
87     Stopping,
88     /// stopped
89     Stopped,
90     /// error occured, e.g. cannot run process
91     Error
92 }
93 
94 /// base class for text reading from std.stdio.File in background thread
95 class BackgroundReaderBase : Thread {
96     private std.stdio.File _file;
97     private shared bool _finished;
98     private ubyte[1] _byteBuffer;
99     private ubyte[] _bytes;
100     dchar[] _textbuffer;
101     private int _len;
102     private bool _utfError;
103 
104     this(std.stdio.File f) {
105         super(&run);
106         assert(f.isOpen());
107         _file = f;
108         _len = 0;
109         _finished = false;
110     }
111 
112     @property bool finished() {
113         return _finished;
114     }
115 
116     ubyte prevchar;
117     void addByte(ubyte data) {
118         if (_bytes.length < _len + 1)
119             _bytes.length = _bytes.length ? _bytes.length * 2 : 1024;
120         bool eolchar = (data == '\r' || data == '\n');
121         bool preveol = (prevchar == '\r' || prevchar == '\n');
122         _bytes[_len++] = data;
123         if (data == '\n')
124             flush();
125         //if ((eolchar && !preveol) || (!eolchar && preveol) || data == '\n') {
126         //    //Log.d("Flushing for prevChar=", prevchar, " newChar=", data);
127         //    flush();
128         //}
129         prevchar = data;
130     }
131     void flush() {
132         if (!_len)
133             return;
134         if (_textbuffer.length < _len)
135             _textbuffer.length = _len + 256;
136         size_t count = 0;
137         for(size_t i = 0; i < _len;) {
138             dchar ch = 0;
139             if (_utfError) {
140                 ch = _bytes[i++];
141             } else {
142                 try {
143                     ch = decode(cast(string)_bytes, i);
144                 } catch (UTFException e) {
145                     _utfError = true;
146                     ch = _bytes[i++];
147                     MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_NONUNICODE_PROC_OUTPUT));
148                 }
149             }
150             _textbuffer[count++] = ch;
151         }
152         _len = 0;
153 
154         if (!count)
155             return;
156 
157         // fix line endings - must be '\n'
158         count = convertLineEndings(_textbuffer[0..count]);
159 
160         // data is ready to send
161         if (count)
162             sendResult(_textbuffer[0..count].dup);
163     }
164     /// inplace convert line endings to unix format (\n)
165     size_t convertLineEndings(dchar[] text) {
166         size_t src = 0;
167         size_t dst = 0;
168         for(;src < text.length;) {
169             dchar ch = text[src++];
170             dchar nextch = src < text.length ? text[src] : 0;
171             if (ch == '\n') {
172                 if (nextch == '\r')
173                     src++;
174                 text[dst++] = '\n';
175             } else if (ch == '\r') {
176                 if (nextch == '\n')
177                     src++;
178                 text[dst++] = '\n';
179             } else {
180                 text[dst++] = ch;
181             }
182         }
183         return dst;
184     }
185     protected void sendResult(dstring text) {
186         // override to deal with ready data
187     }
188 
189     protected void handleFinish() {
190         // override to do something when thread is finishing
191     }
192 
193     private void run() {
194         //Log.d("BackgroundReaderBase run() enter");
195         // read file by bytes
196         try {
197             version (Windows) {
198                 import core.sys.windows.windows;
199                 // separate version for windows as workaround for hanging rawRead
200                 HANDLE h = _file.windowsHandle;
201                 DWORD bytesRead = 0;
202                 DWORD err;
203                 for (;;) {
204                     BOOL res = ReadFile(h, _byteBuffer.ptr, 1, &bytesRead, null);
205                     if (res) {
206                         if (bytesRead == 1)
207                             addByte(_byteBuffer[0]);
208                     } else {
209                         err = GetLastError();
210                         if (err == ERROR_MORE_DATA) {
211                             if (bytesRead == 1)
212                                 addByte(_byteBuffer[0]);
213                             continue;
214                         }
215                         //if (err == ERROR_BROKEN_PIPE || err = ERROR_INVALID_HANDLE)
216                         break;
217                     }
218                 }
219             } else {
220                 for (;;) {
221                     //Log.d("BackgroundReaderBase run() reading file");
222                     if (_file.eof)
223                         break;
224                     ubyte[] r = _file.rawRead(_byteBuffer);
225                     if (!r.length)
226                         break;
227                     //Log.d("BackgroundReaderBase run() read byte: ", r[0]);
228                     addByte(r[0]);
229                 }
230             }
231             _file.close();
232             flush();
233             //Log.d("BackgroundReaderBase run() closing file");
234             //Log.d("BackgroundReaderBase run() file closed");
235         } catch (Exception e) {
236             //Log.e("Exception occured while reading stream: ", e);
237         }
238         handleFinish();
239         _finished = true;
240         //Log.d("BackgroundReaderBase run() exit");
241     }
242 
243     void waitForFinish() {
244         static if (false) {
245             while (isRunning && !_finished)
246                 Thread.sleep( dur!("msecs")( 10 ) );
247         } else {
248             join(false);
249         }
250     }
251 
252 }
253 
254 /// reader which sends output text to TextWriter (warning: call will be made from background thread)
255 class BackgroundReader : BackgroundReaderBase {
256     protected TextWriter _destination;
257     this(std.stdio.File f, TextWriter destination) {
258         super(f);
259         assert(destination);
260         _destination = destination;
261     }
262     override protected void sendResult(dstring text) {
263         // override to deal with ready data
264         _destination.writeText(text);
265     }
266     override protected void handleFinish() {
267         // remove link to destination to help GC
268         _destination = null;
269     }
270 }
271 
272 /// runs external process, catches output, allows to stop
273 class ExternalProcess {
274 
275     protected char[][] _args;
276     protected char[] _workDir;
277     protected char[] _program;
278     protected string[string] _env;
279     protected TextWriter _stdout;
280     protected TextWriter _stderr;
281     protected BackgroundReader _stdoutReader;
282     protected BackgroundReader _stderrReader;
283     protected ProcessPipes _pipes;
284     protected ExternalProcessState _state;
285 
286     protected int _result;
287 
288     @property ExternalProcessState state() { return _state; }
289     /// returns process result for stopped process
290     @property int result() { return _result; }
291 
292     this() {
293     }
294 
295     ExternalProcessState run(string program, string[]args, string dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) {
296         char[][] arguments;
297         foreach(a; args)
298             arguments ~= a.dup;
299         return run(program.dup, arguments, dir.dup, stdoutTarget, stderrTarget);
300     }
301     ExternalProcessState run(char[] program, char[][]args, char[] dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) {
302         MsgLog.trace(MsgText!(DXXConfig.messages.MSG_PROC_RUN)(program,args));
303         _state = ExternalProcessState.None;
304         _program = findExecutablePath(cast(string)program).dup;
305         if (!_program) {
306             _state = ExternalProcessState.Error;
307             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_NOT_FOUND)(program));
308             return _state;
309         }
310         _args = args;
311         _workDir = dir;
312         _stdout = stdoutTarget;
313         _stderr = stderrTarget;
314         _result = 0;
315         assert(_stdout);
316         Redirect redirect;
317         char[][] params;
318         params ~= _program;
319         params ~= _args;
320         if (!_stderr)
321             redirect = Redirect.stdout | Redirect.stderrToStdout | Redirect.stdin;
322         else
323             redirect = Redirect.all;
324 //        sharedLog.info("Trying to run program ", _program, " with args ", _args);
325 //        MsgLog.trace(MsgText!(DXXConfig.messages.MSG_PROC_RUN)(program,args));
326         try {
327             _pipes = pipeProcess(params, redirect, _env, std.process.Config.suppressConsole, _workDir);
328             _state = ExternalProcessState.Running;
329             // start readers
330             _stdoutReader = new BackgroundReader(_pipes.stdout, _stdout);
331             _stdoutReader.start();
332             if (_stderr) {
333                 _stderrReader = new BackgroundReader(_pipes.stderr, _stderr);
334                 _stderrReader.start();
335             }
336         } catch (ProcessException e) {
337             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_RUN)(program,e));
338         } catch (std.stdio.StdioException e) {
339             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_REDIR)(program,e));
340         } catch (Throwable e) {
341             //sharedLog.error("Exception while trying to run program ", _program, " ", e);
342             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_UNKNOWN)(program,e));
343         }
344         return _state;
345     }
346 
347     protected void waitForReadingCompletion() {
348         try {
349             if (_stdoutReader && !_stdoutReader.finished) {
350                 _pipes.stdout.detach();
351                 //Log.d("waitForReadingCompletion - waiting for stdout");
352                 _stdoutReader.waitForFinish();
353                 //Log.d("waitForReadingCompletion - joined stdout");
354             }
355             _stdoutReader = null;
356         } catch (Exception e) {
357             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_WAITING_STDOUT)(_program,e));
358         }
359         try {
360             if (_stderrReader && !_stderrReader.finished) {
361                 _pipes.stderr.detach();
362                 //Log.d("waitForReadingCompletion - waiting for stderr");
363                 _stderrReader.waitForFinish();
364                 _stderrReader = null;
365                 //Log.d("waitForReadingCompletion - joined stderr");
366             }
367         } catch (Exception e) {
368             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_WAITING_STDERR)(_program,e));
369         }
370         //Log.d("waitForReadingCompletion - done");
371     }
372 
373     /// polls all available output from process streams
374     ExternalProcessState poll() {
375         //Log.d("ExternalProcess.poll state = ", _state);
376         bool res = true;
377         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
378             return _state;
379         // check for process finishing
380         try {
381             auto pstate = std.process.tryWait(_pipes.pid);
382             if (pstate.terminated) {
383                 _state = ExternalProcessState.Stopped;
384                 _result = pstate.status;
385                 waitForReadingCompletion();
386             }
387         } catch (Exception e) {
388             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_WAITING)(_program));
389             _state = ExternalProcessState.Error;
390         }
391         return _state;
392     }
393 
394     /// waits until termination
395     ExternalProcessState wait() {
396         MsgLog.info(MsgText!(DXXConfig.messages.MSG_PROC_WAITING));
397         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
398             return _state;
399         try {
400             _result = std.process.wait(_pipes.pid);
401             _state = ExternalProcessState.Stopped;
402             MsgLog.trace(MsgText!(DXXConfig.messages.MSG_PROC_READWAITING));
403             waitForReadingCompletion();
404         } catch (Exception e) {
405             MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_UNKNOWN));
406             _state = ExternalProcessState.Error;
407         }
408         return _state;
409     }
410 
411     /// request process stop
412     ExternalProcessState kill() {
413         MsgLog.info(MsgText!(DXXConfig.messages.MSG_PROC_KILL));
414         
415         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped)
416             return _state;
417         if (_state == ExternalProcessState.Running) {
418             std.process.kill(_pipes.pid);
419             _state = ExternalProcessState.Stopping;
420         }
421         return _state;
422     }
423 
424     bool write(string data) {
425         if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) {
426             return false;
427         } else {
428             //Log.d("writing ", data.length, " characters to stdin");
429             _pipes.stdin.write("", data);
430             _pipes.stdin.flush();
431             //_pipes.stdin.close();
432             return true;
433         }
434     }
435 }
436 private import std.algorithm;
437 private import std.process;
438 private import std.path;
439 private import std.file;
440 private import std.utf;
441 
442 /// for executable name w/o path, find absolute path to executable
443 string findExecutablePath(string executableName) {
444     import std..string : split;
445     version (Windows) {
446         if (!executableName.endsWith(".exe"))
447             executableName = executableName ~ ".exe";
448     }
449     string currentExeDir = dirName(thisExePath());
450     string inCurrentExeDir = absolutePath(buildNormalizedPath(currentExeDir, executableName));
451     if (exists(inCurrentExeDir) && isFile(inCurrentExeDir))
452         return inCurrentExeDir; // found in current directory
453     string pathVariable = environment.get("PATH");
454     if (!pathVariable)
455         return null;
456     string[] paths = pathVariable.split(pathSeparator);
457     foreach(path; paths) {
458         string pathname = absolutePath(buildNormalizedPath(path, executableName));
459         if (exists(pathname) && isFile(pathname))
460             return pathname;
461     }
462     return null;
463 }
464