supervisor.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. // Package supervisor provides a generic API to watch and manage Unix processes.
  2. package supervisor
  3. import (
  4. "bytes"
  5. "fmt"
  6. "os"
  7. "os/exec"
  8. "os/user"
  9. "strconv"
  10. "sync"
  11. "syscall"
  12. "time"
  13. "github.com/Sirupsen/logrus"
  14. )
  15. // State is a FSM state.
  16. type State uint
  17. // States
  18. const (
  19. UNKNOWN State = iota
  20. RUNNING
  21. STOPPED
  22. STARTING
  23. STOPPING
  24. FAILED
  25. EXITED
  26. )
  27. func (s State) String() string {
  28. switch s {
  29. case RUNNING:
  30. return "RUNNING"
  31. case STOPPED:
  32. return "STOPPED"
  33. case STARTING:
  34. return "STARTING"
  35. case STOPPING:
  36. return "STOPPING"
  37. case FAILED:
  38. return "FAILED"
  39. case EXITED:
  40. return "EXITED"
  41. default:
  42. return "UNKNOWN"
  43. }
  44. }
  45. type transition struct {
  46. currState State
  47. nextState State
  48. }
  49. // Transition Table
  50. var tt = []transition{
  51. transition{currState: STOPPED, nextState: STARTING},
  52. transition{currState: STARTING, nextState: RUNNING},
  53. transition{currState: STARTING, nextState: STARTING},
  54. transition{currState: STARTING, nextState: FAILED},
  55. transition{currState: RUNNING, nextState: STOPPING},
  56. transition{currState: RUNNING, nextState: EXITED},
  57. transition{currState: STOPPING, nextState: STOPPED},
  58. transition{currState: STOPPING, nextState: STOPPING},
  59. }
  60. // Supervisable is an interface that represents a process.
  61. type Supervisable interface {
  62. Start()
  63. Stop()
  64. Restart()
  65. Status() State
  66. }
  67. // Process represents a unix process to be supervised.
  68. type Process struct {
  69. lock *sync.RWMutex
  70. state State
  71. maxRetry uint
  72. cmd *exec.Cmd
  73. executable string
  74. wdir string
  75. args []string
  76. done chan error
  77. stop chan bool
  78. out *bytes.Buffer
  79. stateChangeCond *sync.Cond
  80. // stdin io.WriteCloser
  81. // stdoutLog Logger
  82. // stderrLog Logger
  83. }
  84. // NewProcess returns a new process to be supervised.
  85. func NewProcess(executable string, dir string, args []string) (*Process, error) {
  86. // initialize process and set the state to STOPPED without transitioning to it.
  87. p := Process{}
  88. if !isExist(executable) {
  89. return &p, fmt.Errorf("executable can not be found: %s", executable)
  90. }
  91. p.maxRetry = 3
  92. p.executable = executable
  93. p.wdir = dir
  94. p.args = args
  95. p.lock = &sync.RWMutex{}
  96. p.stateChangeCond = sync.NewCond(&sync.RWMutex{})
  97. p.state = STOPPED
  98. p.done = make(chan error)
  99. p.stop = make(chan bool)
  100. p.out = new(bytes.Buffer)
  101. return &p, nil
  102. }
  103. // isExist returns whether the given executable binary is found on the filesystem or not.
  104. func isExist(executable string) bool {
  105. if _, err := os.Stat(executable); !os.IsNotExist(err) {
  106. return true
  107. }
  108. return false
  109. }
  110. // waitFor blocks until the FSM transitions to the given state.
  111. func (p *Process) waitFor(state State) {
  112. for p.state != state {
  113. p.stateChangeCond.L.Lock()
  114. p.stateChangeCond.Wait()
  115. p.stateChangeCond.L.Unlock()
  116. }
  117. }
  118. // WaitFor blocks until the FSM transitions to the given state.
  119. func WaitFor(process *Process, state State) {
  120. process.waitFor(state)
  121. }
  122. // Start will run the process.
  123. func (p *Process) Start() {
  124. p.transitionTo(STARTING)
  125. }
  126. // Stop will cause the process to stop.
  127. func (p *Process) Stop() {
  128. p.transitionTo(STOPPING)
  129. }
  130. // Restart will cause a running process to restart.
  131. func (p *Process) Restart() {
  132. if p.state == RUNNING {
  133. p.Stop()
  134. }
  135. p.waitFor(STOPPED)
  136. p.Start()
  137. }
  138. // Status returns the current state of the FSM.
  139. func (p *Process) Status() State {
  140. return p.state
  141. }
  142. func (p *Process) permittable(state State) bool {
  143. p.lock.Lock()
  144. defer p.lock.Unlock()
  145. for _, t := range tt {
  146. if p.state == t.currState && t.nextState == state {
  147. return true
  148. }
  149. }
  150. return false
  151. }
  152. func (p *Process) setState(state State) {
  153. p.lock.Lock()
  154. p.state = state
  155. p.lock.Unlock()
  156. }
  157. func (p *Process) transitionTo(state State) {
  158. if p.permittable(state) {
  159. p.stateChangeCond.L.Lock()
  160. logrus.WithField("cmd", p.executable).Debugf("transition: '%s' -> '%s'", p.state, state)
  161. if p.out.Len() > 0 {
  162. logrus.Debugf("STDOUT(err): %s", p.out.String())
  163. }
  164. p.setState(state)
  165. go p.run(state)()
  166. p.stateChangeCond.L.Unlock()
  167. p.stateChangeCond.Broadcast()
  168. return
  169. }
  170. logrus.Errorf("transition to '%s' from '%s' is not permitted!", p.state, state)
  171. return
  172. }
  173. func (p *Process) newCommand() *exec.Cmd {
  174. cmd := exec.Command(p.executable)
  175. cmd.Stdout = p.out
  176. cmd.Stderr = p.out
  177. cmd.Dir = p.wdir
  178. cmd.Args = append([]string{p.executable}, p.args...)
  179. currUsr, err := user.Current()
  180. if err != nil {
  181. logrus.Errorf("can not get current running user: %v", err)
  182. }
  183. uid, err := strconv.Atoi(currUsr.Uid)
  184. if err != nil {
  185. panic(fmt.Sprintf("can not convert string to int %s: %v", currUsr.Uid, err))
  186. }
  187. gid, err := strconv.Atoi(currUsr.Gid)
  188. if err != nil {
  189. panic(fmt.Sprintf("can not convert string to int %s: %v", currUsr.Gid, err))
  190. }
  191. cmd.SysProcAttr = &syscall.SysProcAttr{Credential: &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)}}
  192. return cmd
  193. }
  194. func (p *Process) run(state State) func() {
  195. switch state {
  196. case STOPPED:
  197. return func() {
  198. p.done = make(chan error)
  199. p.stop = make(chan bool)
  200. }
  201. case STARTING:
  202. return func() {
  203. // Prepare the command and start.
  204. var err error
  205. for i := uint(1); i <= p.maxRetry; i++ {
  206. // Prepare the command to run.
  207. p.lock.Lock()
  208. p.cmd = p.newCommand()
  209. p.lock.Unlock()
  210. logrus.Debugf("running %s", p.executable)
  211. err = p.cmd.Start()
  212. if err != nil {
  213. logrus.Debugf("process can not be started: %v", err)
  214. logrus.Debugf("retrying... (%d/%d)", i, p.maxRetry)
  215. continue
  216. }
  217. break
  218. }
  219. // Max retry reached process still not started.
  220. if err != nil {
  221. p.transitionTo(FAILED)
  222. return
  223. }
  224. // Process started successfully.
  225. logrus.Debugf("process is started %s PID %d", p.executable, p.cmd.Process.Pid)
  226. // Process Observer
  227. go func() {
  228. err := p.cmd.Wait()
  229. if err != nil {
  230. logrus.Error(err)
  231. close(p.done)
  232. return
  233. }
  234. p.done <- err
  235. close(p.done)
  236. }()
  237. p.transitionTo(RUNNING)
  238. }
  239. case RUNNING:
  240. return func() {
  241. // Stop Observer
  242. go func() {
  243. select {
  244. // process is ordered to stop.
  245. case <-p.stop:
  246. p.transitionTo(STOPPING)
  247. return
  248. // process exited on it's own
  249. case err := <-p.done:
  250. if p.state == RUNNING {
  251. logrus.Infof("process exited: %v", err)
  252. p.transitionTo(EXITED)
  253. return
  254. }
  255. }
  256. }()
  257. }
  258. case STOPPING:
  259. return func() {
  260. gracefullyStopped := false
  261. // first try to kill the process, gracefully
  262. err := p.cmd.Process.Signal(os.Interrupt)
  263. if err != nil {
  264. logrus.Errorf("interrupt signal returned error: %v", err)
  265. }
  266. for i := uint(1); i <= p.maxRetry; i++ {
  267. select {
  268. case <-time.After(3 * time.Second):
  269. logrus.Debugf("retrying... (%d/%d)", i, p.maxRetry)
  270. err := p.cmd.Process.Signal(os.Interrupt)
  271. if err != nil {
  272. logrus.Errorf("interrupt signal returned error: %v", err)
  273. }
  274. case err = <-p.done:
  275. if err == nil {
  276. gracefullyStopped = true
  277. break
  278. }
  279. logrus.Debugf("process stopped with error: %v", err)
  280. break
  281. }
  282. }
  283. // process didn't exit and retry count is full
  284. // hard killing
  285. if !gracefullyStopped {
  286. err := p.cmd.Process.Kill()
  287. if err != nil {
  288. logrus.Fatal("can not kill process!")
  289. }
  290. <-p.done
  291. }
  292. logrus.Debugf("process stopped %s", p.executable)
  293. p.transitionTo(STOPPED)
  294. }
  295. case FAILED:
  296. return func() {
  297. logrus.Fatalf("failed to launch process: %s", p.executable)
  298. }
  299. case EXITED:
  300. return func() {
  301. logrus.Errorf("process exited unexpectedly: %s", p.executable)
  302. os.Exit(1)
  303. }
  304. default: // UNKNOWN
  305. return nil
  306. }
  307. }