worker.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package sizer
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "os"
  7. )
  8. type dummy struct{}
  9. func (d dummy) Write(p []byte) (n int, err error) {
  10. return 0, nil
  11. }
  12. // Trivella provides an io.Writer with a notion of depth in the
  13. // path graph
  14. type Trivella struct {
  15. Level uint
  16. Writer io.Writer
  17. }
  18. // Scendi increases the depth in the graph
  19. func (t *Trivella) Scendi() *Trivella {
  20. var v *Trivella
  21. if t.Level > 0 {
  22. v = TrInit(t.Writer, t.Level-1)
  23. } else {
  24. var d dummy
  25. v = TrInit(d, 0)
  26. }
  27. return v
  28. }
  29. // TrInit assigns the writer to a Trivella
  30. func TrInit(writer io.Writer, level uint) *Trivella {
  31. return &Trivella{Writer: writer, Level: level}
  32. }
  33. // Ruspa is the asynchronous worker that is in charge of collecting
  34. // the underlying sizes and relay the total to the eventual parent.
  35. type Ruspa struct {
  36. INode *INode
  37. ReportIn chan StatusReport
  38. ReportOut chan StatusReport
  39. DoneIn chan StatusReport
  40. DoneOut chan StatusReport
  41. Opened map[string]*Ruspa
  42. }
  43. // NewRuspa inits a new Ruspa
  44. func NewRuspa(inode *INode, reportOut, doneOut chan StatusReport) *Ruspa {
  45. var r Ruspa
  46. r.INode = inode
  47. r.ReportOut = reportOut
  48. r.DoneOut = doneOut
  49. r.ReportIn = make(chan StatusReport)
  50. r.DoneIn = make(chan StatusReport)
  51. r.Opened = make(map[string]*Ruspa)
  52. return &r
  53. }
  54. // Ammucchia gathers the size of the underlying INode(s) from the Ruspa worker
  55. // and adds to the INode size.
  56. func (w *Ruspa) Ammucchia(t *Trivella) {
  57. Console.Debugln(Gray("Ammucchia:", w.INode.Path))
  58. for len(w.Opened) > 0 {
  59. select {
  60. case report := <-w.ReportIn:
  61. Console.Debugln(Gray("ReportIn:", report))
  62. if report.err == nil {
  63. w.INode.Size += report.size
  64. fmt.Fprintf(t.Writer, "%s: %d\n", w.INode.Path, w.INode.Size)
  65. } else {
  66. Console.Debugln(Red(report.err))
  67. }
  68. case leaf := <-w.DoneIn:
  69. Console.Debugln(Gray("Done:", leaf.path))
  70. delete(w.Opened, leaf.path)
  71. }
  72. }
  73. Console.Debugln(Gray(w.INode.Path, ": Completed. Exiting..."))
  74. w.DoneOut <- StatusReport{path: w.INode.Path, size: w.INode.Size}
  75. }
  76. // Scava starts a worker on the given INode.
  77. func (w *Ruspa) Scava(t *Trivella) {
  78. Console.Debugln(Gray("Scava: ", w.INode.Path))
  79. children, err := ls(w.INode.Path)
  80. if err != nil {
  81. w.ReportOut <- StatusReport{path: w.INode.Path, err: err}
  82. }
  83. for _, childPath := range children {
  84. kind, size, err := IdentifyType(childPath)
  85. if err != nil {
  86. w.ReportOut <- StatusReport{path: childPath, err: err}
  87. }
  88. childINode := NewINode(kind, size, childPath)
  89. w.INode.Children.Append(childINode)
  90. switch {
  91. case kind == FileType:
  92. Console.Debugln(Gray("[file]", w.INode.Path, " ~> ", childINode.Path))
  93. w.ReportOut <- StatusReport{path: childPath, size: size}
  94. case kind == DirType:
  95. Console.Debugln(Gray("[dir]", w.INode.Path, " ~> ", childINode.Path))
  96. w.INode.Size += size
  97. cw := NewRuspa(childINode, w.ReportIn, w.DoneIn)
  98. w.Opened[childPath] = cw
  99. go cw.Scava(t.Scendi())
  100. }
  101. }
  102. if len(w.Opened) != 0 {
  103. Console.Debugln(Gray("Spawning Ammucchia:", w.INode.Path))
  104. go w.Ammucchia(t)
  105. } else {
  106. w.DoneOut <- StatusReport{path: w.INode.Path, size: w.INode.Size}
  107. }
  108. }
  109. // Pesa starts the worker for current inode and returns the channel
  110. // to listen to get the size.
  111. func (i *INode) Pesa(t *Trivella, report, done chan StatusReport) {
  112. w := NewRuspa(i, report, done)
  113. go w.Scava(t)
  114. }
  115. // NastroConvogliatore starts a Pesa on a given path and displays
  116. // the current value of the size.
  117. func NastroConvogliatore(path string, depth uint) {
  118. finished := false
  119. pathINode, err := INodeFromPath(path)
  120. if err != nil {
  121. Console.Fatalln("Failed creating inode:", err)
  122. }
  123. Console.Debugln(Gray("Starting Pesa on path:", path))
  124. report := make(chan StatusReport)
  125. done := make(chan StatusReport)
  126. writer := bufio.NewWriter(os.Stdout)
  127. defer writer.Flush()
  128. t := TrInit(writer, depth)
  129. fmt.Fprintf(writer, "%s\n", Green("Starting..."))
  130. pathINode.Pesa(t, report, done)
  131. for !finished {
  132. select {
  133. case res := <-report:
  134. Console.Debugln(Gray("MAIN: result =>", res))
  135. case res := <-done:
  136. Console.Debugln(Gray("MAIN: done =>", res))
  137. finished = true
  138. }
  139. }
  140. fmt.Fprintf(writer, "%s: %d\n", path, pathINode.Size)
  141. return
  142. }