1 module dxx.sys.spawn; 2 3 /** 4 taken from https://github.com/buggins/dlangide/blob/master/src/dlangide/builders/extprocess.d 5 module dlangide.builders.extprocess; 6 7 **/ 8 9 private import std.process; 10 private import std.stdio; 11 private import std.utf; 12 private import std.stdio; 13 private import core.thread; 14 private import core.sync.mutex; 15 16 private import dxx.app; 17 18 mixin __Text; 19 20 /// interface to forward process output to 21 interface TextWriter { 22 /// log lines 23 void writeText(dstring text); 24 } 25 26 /// interface to read text 27 interface TextReader { 28 /// log lines 29 dstring readText(); 30 } 31 32 /// protected text storage box to read and write text from different threads 33 class ProtectedTextStorage : TextReader, TextWriter { 34 35 private Mutex _mutex; 36 private shared bool _closed; 37 private dchar[] _buffer; 38 39 this() { 40 _mutex = new Mutex(); 41 } 42 43 @property bool closed() { return _closed; } 44 45 void close() { 46 if (_closed) 47 return; 48 _closed = true; 49 _buffer = null; 50 } 51 52 /// log lines 53 override void writeText(dstring text) { 54 if (!_closed) { 55 // if not closed 56 _mutex.lock(); 57 scope(exit) _mutex.unlock(); 58 // append text 59 _buffer ~= text; 60 } 61 } 62 63 /// log lines 64 override dstring readText() { 65 if (!_closed) { 66 // if not closed 67 _mutex.lock(); 68 scope(exit) _mutex.unlock(); 69 if (!_buffer.length) 70 return null; 71 dstring res = _buffer.dup; 72 _buffer = null; 73 return res; 74 } else { 75 // reading from closed 76 return null; 77 } 78 } 79 } 80 81 enum ExternalProcessState : uint { 82 /// not initialized 83 None, 84 /// running 85 Running, 86 /// stop is requested 87 Stopping, 88 /// stopped 89 Stopped, 90 /// error occured, e.g. cannot run process 91 Error 92 } 93 94 /// base class for text reading from std.stdio.File in background thread 95 class BackgroundReaderBase : Thread { 96 private std.stdio.File _file; 97 private shared bool _finished; 98 private ubyte[1] _byteBuffer; 99 private ubyte[] _bytes; 100 dchar[] _textbuffer; 101 private int _len; 102 private bool _utfError; 103 104 this(std.stdio.File f) { 105 super(&run); 106 assert(f.isOpen()); 107 _file = f; 108 _len = 0; 109 _finished = false; 110 } 111 112 @property bool finished() { 113 return _finished; 114 } 115 116 ubyte prevchar; 117 void addByte(ubyte data) { 118 if (_bytes.length < _len + 1) 119 _bytes.length = _bytes.length ? _bytes.length * 2 : 1024; 120 bool eolchar = (data == '\r' || data == '\n'); 121 bool preveol = (prevchar == '\r' || prevchar == '\n'); 122 _bytes[_len++] = data; 123 if (data == '\n') 124 flush(); 125 //if ((eolchar && !preveol) || (!eolchar && preveol) || data == '\n') { 126 // //Log.d("Flushing for prevChar=", prevchar, " newChar=", data); 127 // flush(); 128 //} 129 prevchar = data; 130 } 131 void flush() { 132 if (!_len) 133 return; 134 if (_textbuffer.length < _len) 135 _textbuffer.length = _len + 256; 136 size_t count = 0; 137 for(size_t i = 0; i < _len;) { 138 dchar ch = 0; 139 if (_utfError) { 140 ch = _bytes[i++]; 141 } else { 142 try { 143 ch = decode(cast(string)_bytes, i); 144 } catch (UTFException e) { 145 _utfError = true; 146 ch = _bytes[i++]; 147 MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_NONUNICODE_PROC_OUTPUT)); 148 } 149 } 150 _textbuffer[count++] = ch; 151 } 152 _len = 0; 153 154 if (!count) 155 return; 156 157 // fix line endings - must be '\n' 158 count = convertLineEndings(_textbuffer[0..count]); 159 160 // data is ready to send 161 if (count) 162 sendResult(_textbuffer[0..count].dup); 163 } 164 /// inplace convert line endings to unix format (\n) 165 size_t convertLineEndings(dchar[] text) { 166 size_t src = 0; 167 size_t dst = 0; 168 for(;src < text.length;) { 169 dchar ch = text[src++]; 170 dchar nextch = src < text.length ? text[src] : 0; 171 if (ch == '\n') { 172 if (nextch == '\r') 173 src++; 174 text[dst++] = '\n'; 175 } else if (ch == '\r') { 176 if (nextch == '\n') 177 src++; 178 text[dst++] = '\n'; 179 } else { 180 text[dst++] = ch; 181 } 182 } 183 return dst; 184 } 185 protected void sendResult(dstring text) { 186 // override to deal with ready data 187 } 188 189 protected void handleFinish() { 190 // override to do something when thread is finishing 191 } 192 193 private void run() { 194 //Log.d("BackgroundReaderBase run() enter"); 195 // read file by bytes 196 try { 197 version (Windows) { 198 import core.sys.windows.windows; 199 // separate version for windows as workaround for hanging rawRead 200 HANDLE h = _file.windowsHandle; 201 DWORD bytesRead = 0; 202 DWORD err; 203 for (;;) { 204 BOOL res = ReadFile(h, _byteBuffer.ptr, 1, &bytesRead, null); 205 if (res) { 206 if (bytesRead == 1) 207 addByte(_byteBuffer[0]); 208 } else { 209 err = GetLastError(); 210 if (err == ERROR_MORE_DATA) { 211 if (bytesRead == 1) 212 addByte(_byteBuffer[0]); 213 continue; 214 } 215 //if (err == ERROR_BROKEN_PIPE || err = ERROR_INVALID_HANDLE) 216 break; 217 } 218 } 219 } else { 220 for (;;) { 221 //Log.d("BackgroundReaderBase run() reading file"); 222 if (_file.eof) 223 break; 224 ubyte[] r = _file.rawRead(_byteBuffer); 225 if (!r.length) 226 break; 227 //Log.d("BackgroundReaderBase run() read byte: ", r[0]); 228 addByte(r[0]); 229 } 230 } 231 _file.close(); 232 flush(); 233 //Log.d("BackgroundReaderBase run() closing file"); 234 //Log.d("BackgroundReaderBase run() file closed"); 235 } catch (Exception e) { 236 //Log.e("Exception occured while reading stream: ", e); 237 } 238 handleFinish(); 239 _finished = true; 240 //Log.d("BackgroundReaderBase run() exit"); 241 } 242 243 void waitForFinish() { 244 static if (false) { 245 while (isRunning && !_finished) 246 Thread.sleep( dur!("msecs")( 10 ) ); 247 } else { 248 join(false); 249 } 250 } 251 252 } 253 254 /// reader which sends output text to TextWriter (warning: call will be made from background thread) 255 class BackgroundReader : BackgroundReaderBase { 256 protected TextWriter _destination; 257 this(std.stdio.File f, TextWriter destination) { 258 super(f); 259 assert(destination); 260 _destination = destination; 261 } 262 override protected void sendResult(dstring text) { 263 // override to deal with ready data 264 _destination.writeText(text); 265 } 266 override protected void handleFinish() { 267 // remove link to destination to help GC 268 _destination = null; 269 } 270 } 271 272 /// runs external process, catches output, allows to stop 273 class ExternalProcess { 274 275 protected char[][] _args; 276 protected char[] _workDir; 277 protected char[] _program; 278 protected string[string] _env; 279 protected TextWriter _stdout; 280 protected TextWriter _stderr; 281 protected BackgroundReader _stdoutReader; 282 protected BackgroundReader _stderrReader; 283 protected ProcessPipes _pipes; 284 protected ExternalProcessState _state; 285 286 protected int _result; 287 288 @property ExternalProcessState state() { return _state; } 289 /// returns process result for stopped process 290 @property int result() { return _result; } 291 292 this() { 293 } 294 295 ExternalProcessState run(string program, string[]args, string dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) { 296 char[][] arguments; 297 foreach(a; args) 298 arguments ~= a.dup; 299 return run(program.dup, arguments, dir.dup, stdoutTarget, stderrTarget); 300 } 301 ExternalProcessState run(char[] program, char[][]args, char[] dir, TextWriter stdoutTarget, TextWriter stderrTarget = null) { 302 MsgLog.trace(MsgText!(DXXConfig.messages.MSG_PROC_RUN)(program,args)); 303 _state = ExternalProcessState.None; 304 _program = findExecutablePath(cast(string)program).dup; 305 if (!_program) { 306 _state = ExternalProcessState.Error; 307 MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_NOT_FOUND)(program)); 308 return _state; 309 } 310 _args = args; 311 _workDir = dir; 312 _stdout = stdoutTarget; 313 _stderr = stderrTarget; 314 _result = 0; 315 assert(_stdout); 316 Redirect redirect; 317 char[][] params; 318 params ~= _program; 319 params ~= _args; 320 if (!_stderr) 321 redirect = Redirect.stdout | Redirect.stderrToStdout | Redirect.stdin; 322 else 323 redirect = Redirect.all; 324 // sharedLog.info("Trying to run program ", _program, " with args ", _args); 325 // MsgLog.trace(MsgText!(DXXConfig.messages.MSG_PROC_RUN)(program,args)); 326 try { 327 _pipes = pipeProcess(params, redirect, _env, std.process.Config.suppressConsole, _workDir); 328 _state = ExternalProcessState.Running; 329 // start readers 330 _stdoutReader = new BackgroundReader(_pipes.stdout, _stdout); 331 _stdoutReader.start(); 332 if (_stderr) { 333 _stderrReader = new BackgroundReader(_pipes.stderr, _stderr); 334 _stderrReader.start(); 335 } 336 } catch (ProcessException e) { 337 MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_RUN)(program,e)); 338 } catch (std.stdio.StdioException e) { 339 MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_REDIR)(program,e)); 340 } catch (Throwable e) { 341 //sharedLog.error("Exception while trying to run program ", _program, " ", e); 342 MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_UNKNOWN)(program,e)); 343 } 344 return _state; 345 } 346 347 protected void waitForReadingCompletion() { 348 try { 349 if (_stdoutReader && !_stdoutReader.finished) { 350 _pipes.stdout.detach(); 351 //Log.d("waitForReadingCompletion - waiting for stdout"); 352 _stdoutReader.waitForFinish(); 353 //Log.d("waitForReadingCompletion - joined stdout"); 354 } 355 _stdoutReader = null; 356 } catch (Exception e) { 357 MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_WAITING_STDOUT)(_program,e)); 358 } 359 try { 360 if (_stderrReader && !_stderrReader.finished) { 361 _pipes.stderr.detach(); 362 //Log.d("waitForReadingCompletion - waiting for stderr"); 363 _stderrReader.waitForFinish(); 364 _stderrReader = null; 365 //Log.d("waitForReadingCompletion - joined stderr"); 366 } 367 } catch (Exception e) { 368 MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_WAITING_STDERR)(_program,e)); 369 } 370 //Log.d("waitForReadingCompletion - done"); 371 } 372 373 /// polls all available output from process streams 374 ExternalProcessState poll() { 375 //Log.d("ExternalProcess.poll state = ", _state); 376 bool res = true; 377 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 378 return _state; 379 // check for process finishing 380 try { 381 auto pstate = std.process.tryWait(_pipes.pid); 382 if (pstate.terminated) { 383 _state = ExternalProcessState.Stopped; 384 _result = pstate.status; 385 waitForReadingCompletion(); 386 } 387 } catch (Exception e) { 388 MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_WAITING)(_program)); 389 _state = ExternalProcessState.Error; 390 } 391 return _state; 392 } 393 394 /// waits until termination 395 ExternalProcessState wait() { 396 MsgLog.info(MsgText!(DXXConfig.messages.MSG_PROC_WAITING)); 397 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 398 return _state; 399 try { 400 _result = std.process.wait(_pipes.pid); 401 _state = ExternalProcessState.Stopped; 402 MsgLog.trace(MsgText!(DXXConfig.messages.MSG_PROC_READWAITING)); 403 waitForReadingCompletion(); 404 } catch (Exception e) { 405 MsgLog.error(MsgText!(DXXConfig.messages.MSG_ERR_PROC_UNKNOWN)); 406 _state = ExternalProcessState.Error; 407 } 408 return _state; 409 } 410 411 /// request process stop 412 ExternalProcessState kill() { 413 MsgLog.info(MsgText!(DXXConfig.messages.MSG_PROC_KILL)); 414 415 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) 416 return _state; 417 if (_state == ExternalProcessState.Running) { 418 std.process.kill(_pipes.pid); 419 _state = ExternalProcessState.Stopping; 420 } 421 return _state; 422 } 423 424 bool write(string data) { 425 if (_state == ExternalProcessState.Error || _state == ExternalProcessState.None || _state == ExternalProcessState.Stopped) { 426 return false; 427 } else { 428 //Log.d("writing ", data.length, " characters to stdin"); 429 _pipes.stdin.write("", data); 430 _pipes.stdin.flush(); 431 //_pipes.stdin.close(); 432 return true; 433 } 434 } 435 } 436 private import std.algorithm; 437 private import std.process; 438 private import std.path; 439 private import std.file; 440 private import std.utf; 441 442 /// for executable name w/o path, find absolute path to executable 443 string findExecutablePath(string executableName) { 444 import std..string : split; 445 version (Windows) { 446 if (!executableName.endsWith(".exe")) 447 executableName = executableName ~ ".exe"; 448 } 449 string currentExeDir = dirName(thisExePath()); 450 string inCurrentExeDir = absolutePath(buildNormalizedPath(currentExeDir, executableName)); 451 if (exists(inCurrentExeDir) && isFile(inCurrentExeDir)) 452 return inCurrentExeDir; // found in current directory 453 string pathVariable = environment.get("PATH"); 454 if (!pathVariable) 455 return null; 456 string[] paths = pathVariable.split(pathSeparator); 457 foreach(path; paths) { 458 string pathname = absolutePath(buildNormalizedPath(path, executableName)); 459 if (exists(pathname) && isFile(pathname)) 460 return pathname; 461 } 462 return null; 463 } 464