supervisor.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. // Package supervisor provides a generic API to watch and manage Unix processes.
  2. package supervisor
  3. import (
  4. "bytes"
  5. "fmt"
  6. "os"
  7. "os/exec"
  8. "os/user"
  9. "strconv"
  10. "sync"
  11. "syscall"
  12. "time"
  13. "github.com/Sirupsen/logrus"
  14. )
  15. // State is a FSM state.
  16. type State uint
  17. // States
  18. const (
  19. UNKNOWN State = iota
  20. RUNNING
  21. STOPPED
  22. STARTING
  23. STOPPING
  24. FAILED
  25. EXITED
  26. )
  27. func (s State) String() string {
  28. switch s {
  29. case RUNNING:
  30. return "RUNNING"
  31. case STOPPED:
  32. return "STOPPED"
  33. case STARTING:
  34. return "STARTING"
  35. case STOPPING:
  36. return "STOPPING"
  37. case FAILED:
  38. return "FAILED"
  39. case EXITED:
  40. return "EXITED"
  41. default:
  42. return "UNKNOWN"
  43. }
  44. }
  45. type transition struct {
  46. currState State
  47. nextState State
  48. }
  49. // Transition Table
  50. var tt = []transition{
  51. transition{currState: STOPPED, nextState: STARTING},
  52. transition{currState: STARTING, nextState: RUNNING},
  53. transition{currState: STARTING, nextState: STARTING},
  54. transition{currState: STARTING, nextState: FAILED},
  55. transition{currState: RUNNING, nextState: STOPPING},
  56. transition{currState: RUNNING, nextState: EXITED},
  57. transition{currState: STOPPING, nextState: STOPPED},
  58. transition{currState: STOPPING, nextState: STOPPING},
  59. }
  60. // Process represents a unix process to be supervised.
  61. type Process struct {
  62. lock *sync.RWMutex
  63. state State
  64. maxRetry uint
  65. cmd *exec.Cmd
  66. executable string
  67. wdir string
  68. args []string
  69. done chan error
  70. stop chan bool
  71. out *bytes.Buffer
  72. stateChangeCond *sync.Cond
  73. // stdin io.WriteCloser
  74. // stdoutLog Logger
  75. // stderrLog Logger
  76. }
  77. // NewProcess returns a new process to be supervised.
  78. func NewProcess(executable string, dir string, args []string) (*Process, error) {
  79. // initialize process and set the state to STOPPED without transitioning to it.
  80. p := Process{}
  81. if !isExist(executable) {
  82. return &p, fmt.Errorf("executable can not be found: %s", executable)
  83. }
  84. p.maxRetry = 3
  85. p.executable = executable
  86. p.wdir = dir
  87. p.args = args
  88. p.lock = &sync.RWMutex{}
  89. p.stateChangeCond = sync.NewCond(&sync.RWMutex{})
  90. p.state = STOPPED
  91. p.done = make(chan error)
  92. p.stop = make(chan bool)
  93. p.out = new(bytes.Buffer)
  94. return &p, nil
  95. }
  96. // isExist returns wether the given executable binary is found on the filesystem or not.
  97. func isExist(executable string) bool {
  98. if _, err := os.Stat(executable); !os.IsNotExist(err) {
  99. return true
  100. }
  101. return false
  102. }
  103. // waitFor blocks until the FSM transitions to the given state.
  104. func (p *Process) waitFor(state State) {
  105. for p.state != state {
  106. p.stateChangeCond.L.Lock()
  107. p.stateChangeCond.Wait()
  108. p.stateChangeCond.L.Unlock()
  109. }
  110. }
  111. // Start will run the process.
  112. func (p *Process) Start() {
  113. p.transitionTo(STARTING)
  114. }
  115. // Stop will cause the process to stop.
  116. func (p *Process) Stop() {
  117. p.transitionTo(STOPPING)
  118. }
  119. // Restart will cause a running process to restart.
  120. func (p *Process) Restart() {
  121. if p.state == RUNNING {
  122. p.Stop()
  123. }
  124. p.waitFor(STOPPED)
  125. p.Start()
  126. }
  127. // Status returns the current state of the FSM.
  128. func (p *Process) Status() State {
  129. return p.state
  130. }
  131. // IsRunning retunrs wether the process is running or not.
  132. func (p *Process) IsRunning() bool {
  133. return p.state == RUNNING
  134. }
  135. func (p *Process) permittable(state State) bool {
  136. p.lock.Lock()
  137. defer p.lock.Unlock()
  138. for _, t := range tt {
  139. if p.state == t.currState && t.nextState == state {
  140. return true
  141. }
  142. }
  143. return false
  144. }
  145. func (p *Process) setState(state State) {
  146. p.lock.Lock()
  147. p.state = state
  148. p.lock.Unlock()
  149. }
  150. func (p *Process) transitionTo(state State) {
  151. if p.permittable(state) {
  152. p.stateChangeCond.L.Lock()
  153. logrus.Infof("transition: '%s' -> '%s'", p.state, state)
  154. logrus.Debug(p.out.String())
  155. p.setState(state)
  156. go p.run(state)()
  157. p.stateChangeCond.L.Unlock()
  158. p.stateChangeCond.Broadcast()
  159. return
  160. }
  161. logrus.Errorf("transition to '%s' from '%s' is not permitted!", p.state, state)
  162. return
  163. }
  164. func (p *Process) newCommand() *exec.Cmd {
  165. cmd := exec.Command(p.executable)
  166. cmd.Stdout = p.out
  167. cmd.Stderr = p.out
  168. cmd.Dir = p.wdir
  169. cmd.Args = append([]string{p.executable}, p.args...)
  170. currUsr, err := user.Current()
  171. if err != nil {
  172. logrus.Errorf("can not get current running user: %v", err)
  173. }
  174. uid, err := strconv.Atoi(currUsr.Uid)
  175. if err != nil {
  176. panic(fmt.Sprintf("can not convert string to int %s: %v", currUsr.Uid, err))
  177. }
  178. gid, err := strconv.Atoi(currUsr.Gid)
  179. if err != nil {
  180. panic(fmt.Sprintf("can not convert string to int %s: %v", currUsr.Gid, err))
  181. }
  182. cmd.SysProcAttr = &syscall.SysProcAttr{Credential: &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)}}
  183. return cmd
  184. }
  185. func (p *Process) run(state State) func() {
  186. switch state {
  187. case STOPPED:
  188. return func() {
  189. p.done = make(chan error)
  190. p.stop = make(chan bool)
  191. }
  192. case STARTING:
  193. return func() {
  194. // Prepare the command and start.
  195. var err error
  196. for i := uint(1); i <= p.maxRetry; i++ {
  197. // Prepare the command to run.
  198. p.lock.Lock()
  199. p.cmd = p.newCommand()
  200. p.lock.Unlock()
  201. logrus.Infof("starting the process")
  202. err = p.cmd.Start()
  203. if err != nil {
  204. logrus.Warnf("process can not be started: %v", err)
  205. logrus.Infof("retrying... (%d/%d)", i, p.maxRetry)
  206. continue
  207. }
  208. break
  209. }
  210. // Max retry reached process still not started.
  211. if err != nil {
  212. p.transitionTo(FAILED)
  213. return
  214. }
  215. // Process started successfully.
  216. logrus.Info("process is started")
  217. // Process Observer
  218. go func() {
  219. err := p.cmd.Wait()
  220. if err != nil {
  221. logrus.Error(err)
  222. close(p.done)
  223. return
  224. }
  225. p.done <- err
  226. close(p.done)
  227. }()
  228. logrus.Info("observer goroutine launched")
  229. p.transitionTo(RUNNING)
  230. }
  231. case RUNNING:
  232. return func() {
  233. // Stop Observer
  234. go func() {
  235. select {
  236. // process is ordered to stop.
  237. case <-p.stop:
  238. p.transitionTo(STOPPING)
  239. return
  240. // process exited on it's own
  241. case err := <-p.done:
  242. if p.state == RUNNING {
  243. logrus.Infof("process exited: %v", err)
  244. p.transitionTo(EXITED)
  245. return
  246. }
  247. }
  248. }()
  249. }
  250. case STOPPING:
  251. return func() {
  252. gracefullyStopped := false
  253. // first try to kill the process, gracefully
  254. err := p.cmd.Process.Signal(os.Interrupt)
  255. if err != nil {
  256. logrus.Errorf("interrupt signal returned error: %v", err)
  257. }
  258. for i := uint(1); i <= p.maxRetry; i++ {
  259. select {
  260. case <-time.After(3 * time.Second):
  261. logrus.Infof("retrying... (%d/%d)", i, p.maxRetry)
  262. err := p.cmd.Process.Signal(os.Interrupt)
  263. if err != nil {
  264. logrus.Errorf("interrupt signal returned error: %v", err)
  265. }
  266. case err = <-p.done:
  267. if err == nil {
  268. gracefullyStopped = true
  269. break
  270. }
  271. logrus.Errorf("process stopped with error: %v", err)
  272. break
  273. }
  274. }
  275. // process didn't exit and retry count is full
  276. // hard killing
  277. if !gracefullyStopped {
  278. err := p.cmd.Process.Kill()
  279. if err != nil {
  280. logrus.Fatal("can not kill process!")
  281. }
  282. <-p.done
  283. }
  284. logrus.Info("process stopped")
  285. p.transitionTo(STOPPED)
  286. }
  287. case FAILED:
  288. return func() {
  289. logrus.Fatal("process operation failed state")
  290. }
  291. case EXITED:
  292. return func() {
  293. logrus.Errorln("process exited unexpectedly")
  294. logrus.Printf("out: %s", p.out.String())
  295. os.Exit(1)
  296. }
  297. default: // UNKNOWN
  298. return nil
  299. }
  300. }