supervisor.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. // Package supervisor provides a generic API to watch and manage Unix processes.
  2. package supervisor
  3. import (
  4. "bytes"
  5. "fmt"
  6. "os"
  7. "os/exec"
  8. "os/user"
  9. "strconv"
  10. "sync"
  11. "syscall"
  12. "time"
  13. "github.com/Sirupsen/logrus"
  14. )
  15. // State is a FSM state.
  16. type State uint
  17. // States
  18. const (
  19. UNKNOWN State = iota
  20. RUNNING
  21. STOPPED
  22. STARTING
  23. STOPPING
  24. FAILED
  25. EXITED
  26. )
  27. func (s State) String() string {
  28. switch s {
  29. case RUNNING:
  30. return "RUNNING"
  31. case STOPPED:
  32. return "STOPPED"
  33. case STARTING:
  34. return "STARTING"
  35. case STOPPING:
  36. return "STOPPING"
  37. case FAILED:
  38. return "FAILED"
  39. case EXITED:
  40. return "EXITED"
  41. default:
  42. return "UNKNOWN"
  43. }
  44. }
  45. type transition struct {
  46. currState State
  47. nextState State
  48. }
  49. // Transition Table
  50. var tt = []transition{
  51. transition{currState: STOPPED, nextState: STARTING},
  52. transition{currState: STARTING, nextState: RUNNING},
  53. transition{currState: STARTING, nextState: STARTING},
  54. transition{currState: STARTING, nextState: FAILED},
  55. transition{currState: RUNNING, nextState: STOPPING},
  56. transition{currState: RUNNING, nextState: EXITED},
  57. transition{currState: STOPPING, nextState: STOPPED},
  58. transition{currState: STOPPING, nextState: STOPPING},
  59. }
  60. // Supervisable is an interface that represents a process.
  61. type Supervisable interface {
  62. Start()
  63. Stop()
  64. Restart()
  65. Status() State
  66. }
  67. // Process represents a unix process to be supervised.
  68. type Process struct {
  69. lock *sync.RWMutex
  70. state State
  71. maxRetry uint
  72. cmd *exec.Cmd
  73. executable string
  74. wdir string
  75. args []string
  76. done chan error
  77. stop chan bool
  78. out *bytes.Buffer
  79. stateChangeCond *sync.Cond
  80. // stdin io.WriteCloser
  81. // stdoutLog Logger
  82. // stderrLog Logger
  83. }
  84. // NewProcess returns a new process to be supervised.
  85. func NewProcess(executable string, dir string, args []string) (*Process, error) {
  86. // initialize process and set the state to STOPPED without transitioning to it.
  87. p := Process{}
  88. if !isExist(executable) {
  89. return &p, fmt.Errorf("executable can not be found: %s", executable)
  90. }
  91. p.maxRetry = 3
  92. p.executable = executable
  93. p.wdir = dir
  94. p.args = args
  95. p.lock = &sync.RWMutex{}
  96. p.stateChangeCond = sync.NewCond(&sync.RWMutex{})
  97. p.state = STOPPED
  98. p.done = make(chan error)
  99. p.stop = make(chan bool)
  100. p.out = new(bytes.Buffer)
  101. return &p, nil
  102. }
  103. // isExist returns wether the given executable binary is found on the filesystem or not.
  104. func isExist(executable string) bool {
  105. if _, err := os.Stat(executable); !os.IsNotExist(err) {
  106. return true
  107. }
  108. return false
  109. }
  110. // waitFor blocks until the FSM transitions to the given state.
  111. func (p *Process) waitFor(state State) {
  112. for p.state != state {
  113. p.stateChangeCond.L.Lock()
  114. p.stateChangeCond.Wait()
  115. p.stateChangeCond.L.Unlock()
  116. }
  117. }
  118. // Start will run the process.
  119. func (p *Process) Start() {
  120. p.transitionTo(STARTING)
  121. }
  122. // Stop will cause the process to stop.
  123. func (p *Process) Stop() {
  124. p.transitionTo(STOPPING)
  125. }
  126. // Restart will cause a running process to restart.
  127. func (p *Process) Restart() {
  128. if p.state == RUNNING {
  129. p.Stop()
  130. }
  131. p.waitFor(STOPPED)
  132. p.Start()
  133. }
  134. // Status returns the current state of the FSM.
  135. func (p *Process) Status() State {
  136. return p.state
  137. }
  138. func (p *Process) permittable(state State) bool {
  139. p.lock.Lock()
  140. defer p.lock.Unlock()
  141. for _, t := range tt {
  142. if p.state == t.currState && t.nextState == state {
  143. return true
  144. }
  145. }
  146. return false
  147. }
  148. func (p *Process) setState(state State) {
  149. p.lock.Lock()
  150. p.state = state
  151. p.lock.Unlock()
  152. }
  153. func (p *Process) transitionTo(state State) {
  154. if p.permittable(state) {
  155. p.stateChangeCond.L.Lock()
  156. logrus.WithField("cmd", p.executable).Debugf("transition: '%s' -> '%s'", p.state, state)
  157. if p.out.Len() > 0 {
  158. logrus.Debugf("STDOUT(err): %s", p.out.String())
  159. }
  160. p.setState(state)
  161. go p.run(state)()
  162. p.stateChangeCond.L.Unlock()
  163. p.stateChangeCond.Broadcast()
  164. return
  165. }
  166. logrus.Errorf("transition to '%s' from '%s' is not permitted!", p.state, state)
  167. return
  168. }
  169. func (p *Process) newCommand() *exec.Cmd {
  170. cmd := exec.Command(p.executable)
  171. cmd.Stdout = p.out
  172. cmd.Stderr = p.out
  173. cmd.Dir = p.wdir
  174. cmd.Args = append([]string{p.executable}, p.args...)
  175. currUsr, err := user.Current()
  176. if err != nil {
  177. logrus.Errorf("can not get current running user: %v", err)
  178. }
  179. uid, err := strconv.Atoi(currUsr.Uid)
  180. if err != nil {
  181. panic(fmt.Sprintf("can not convert string to int %s: %v", currUsr.Uid, err))
  182. }
  183. gid, err := strconv.Atoi(currUsr.Gid)
  184. if err != nil {
  185. panic(fmt.Sprintf("can not convert string to int %s: %v", currUsr.Gid, err))
  186. }
  187. cmd.SysProcAttr = &syscall.SysProcAttr{Credential: &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)}}
  188. return cmd
  189. }
  190. func (p *Process) run(state State) func() {
  191. switch state {
  192. case STOPPED:
  193. return func() {
  194. p.done = make(chan error)
  195. p.stop = make(chan bool)
  196. }
  197. case STARTING:
  198. return func() {
  199. // Prepare the command and start.
  200. var err error
  201. for i := uint(1); i <= p.maxRetry; i++ {
  202. // Prepare the command to run.
  203. p.lock.Lock()
  204. p.cmd = p.newCommand()
  205. p.lock.Unlock()
  206. logrus.Debugf("running %s", p.executable)
  207. err = p.cmd.Start()
  208. if err != nil {
  209. logrus.Debugf("process can not be started: %v", err)
  210. logrus.Debugf("retrying... (%d/%d)", i, p.maxRetry)
  211. continue
  212. }
  213. break
  214. }
  215. // Max retry reached process still not started.
  216. if err != nil {
  217. p.transitionTo(FAILED)
  218. return
  219. }
  220. // Process started successfully.
  221. logrus.Debugf("process is started %s PID %d", p.executable, p.cmd.Process.Pid)
  222. // Process Observer
  223. go func() {
  224. err := p.cmd.Wait()
  225. if err != nil {
  226. logrus.Error(err)
  227. close(p.done)
  228. return
  229. }
  230. p.done <- err
  231. close(p.done)
  232. }()
  233. p.transitionTo(RUNNING)
  234. }
  235. case RUNNING:
  236. return func() {
  237. // Stop Observer
  238. go func() {
  239. select {
  240. // process is ordered to stop.
  241. case <-p.stop:
  242. p.transitionTo(STOPPING)
  243. return
  244. // process exited on it's own
  245. case err := <-p.done:
  246. if p.state == RUNNING {
  247. logrus.Infof("process exited: %v", err)
  248. p.transitionTo(EXITED)
  249. return
  250. }
  251. }
  252. }()
  253. }
  254. case STOPPING:
  255. return func() {
  256. gracefullyStopped := false
  257. // first try to kill the process, gracefully
  258. err := p.cmd.Process.Signal(os.Interrupt)
  259. if err != nil {
  260. logrus.Errorf("interrupt signal returned error: %v", err)
  261. }
  262. for i := uint(1); i <= p.maxRetry; i++ {
  263. select {
  264. case <-time.After(3 * time.Second):
  265. logrus.Debugf("retrying... (%d/%d)", i, p.maxRetry)
  266. err := p.cmd.Process.Signal(os.Interrupt)
  267. if err != nil {
  268. logrus.Errorf("interrupt signal returned error: %v", err)
  269. }
  270. case err = <-p.done:
  271. if err == nil {
  272. gracefullyStopped = true
  273. break
  274. }
  275. logrus.Debugf("process stopped with error: %v", err)
  276. break
  277. }
  278. }
  279. // process didn't exit and retry count is full
  280. // hard killing
  281. if !gracefullyStopped {
  282. err := p.cmd.Process.Kill()
  283. if err != nil {
  284. logrus.Fatal("can not kill process!")
  285. }
  286. <-p.done
  287. }
  288. logrus.Debugf("process stopped %s", p.executable)
  289. p.transitionTo(STOPPED)
  290. }
  291. case FAILED:
  292. return func() {
  293. logrus.Fatalf("failed to launch process: %s", p.executable)
  294. }
  295. case EXITED:
  296. return func() {
  297. logrus.Errorf("process exited unexpectedly: %s", p.executable)
  298. os.Exit(1)
  299. }
  300. default: // UNKNOWN
  301. return nil
  302. }
  303. }