worker.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package sizer
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "os"
  7. )
  8. type dummy struct{}
  9. func (d dummy) Write(p []byte) (n int, err error) {
  10. return 0, nil
  11. }
  12. // Trivella provides an io.Writer with a notion of depth in the
  13. // path graph
  14. type Trivella struct {
  15. Level uint
  16. Writer io.Writer
  17. }
  18. // Scendi increases the depth in the graph
  19. func (t *Trivella) Scendi() *Trivella {
  20. var v *Trivella
  21. if t.Level > 0 {
  22. v = TrInit(t.Writer, t.Level-1)
  23. } else {
  24. var d dummy
  25. v = TrInit(d, 0)
  26. }
  27. return v
  28. }
  29. // TrInit assigns the writer to a Trivella
  30. func TrInit(writer io.Writer, level uint) *Trivella {
  31. var t Trivella
  32. t.Writer = writer
  33. t.Level = level
  34. return &t
  35. }
  36. // Ruspa is the asynchronous worker that is in charge of collecting
  37. // the underlying sizes and relay the total to the eventual parent.
  38. type Ruspa struct {
  39. INode *INode
  40. ReportIn chan StatusReport
  41. ReportOut chan StatusReport
  42. DoneIn chan StatusReport
  43. DoneOut chan StatusReport
  44. Opened map[string]*Ruspa
  45. }
  46. // NewRuspa inits a new Ruspa
  47. func NewRuspa(inode *INode, reportOut, doneOut chan StatusReport) *Ruspa {
  48. var r Ruspa
  49. r.INode = inode
  50. r.ReportOut = reportOut
  51. r.DoneOut = doneOut
  52. r.ReportIn = make(chan StatusReport)
  53. r.DoneIn = make(chan StatusReport)
  54. r.Opened = make(map[string]*Ruspa)
  55. return &r
  56. }
  57. // Ammucchia gathers the size of the underlying INode(s) from the Ruspa worker
  58. // and adds to the INode size.
  59. func (w *Ruspa) Ammucchia(t *Trivella) {
  60. Console.Debugln(Gray("Ammucchia:", w.INode.Path))
  61. for len(w.Opened) > 0 {
  62. select {
  63. case report := <-w.ReportIn:
  64. Console.Debugln(Gray("ReportIn:", report))
  65. if report.err == nil {
  66. w.INode.Size += report.size
  67. fmt.Fprintf(t.Writer, "%s: %d\n", w.INode.Path, w.INode.Size)
  68. } else {
  69. Console.Debugln(Red(report.err))
  70. }
  71. case leaf := <-w.DoneIn:
  72. Console.Debugln(Gray("Done:", leaf.path))
  73. delete(w.Opened, leaf.path)
  74. }
  75. }
  76. Console.Debugln(Gray(w.INode.Path, ": Completed. Exiting..."))
  77. w.DoneOut <- StatusReport{path: w.INode.Path, size: w.INode.Size}
  78. }
  79. // Scava starts a worker on the given INode.
  80. func (w *Ruspa) Scava(t *Trivella) {
  81. Console.Debugln(Gray("Scava: ", w.INode.Path))
  82. children, err := ls(w.INode.Path)
  83. if err != nil {
  84. w.ReportOut <- StatusReport{path: w.INode.Path, err: err}
  85. }
  86. for _, childPath := range children {
  87. kind, size, err := IdentifyType(childPath)
  88. if err != nil {
  89. w.ReportOut <- StatusReport{path: childPath, err: err}
  90. }
  91. childINode := NewINode(kind, size, childPath)
  92. w.INode.Children.Append(childINode)
  93. switch {
  94. case kind == FileType:
  95. Console.Debugln(Gray("[file]", w.INode.Path, " ~> ", childINode.Path))
  96. w.ReportOut <- StatusReport{path: childPath, size: size}
  97. case kind == DirType:
  98. Console.Debugln(Gray("[dir]", w.INode.Path, " ~> ", childINode.Path))
  99. w.INode.Size += size
  100. cw := NewRuspa(childINode, w.ReportIn, w.DoneIn)
  101. w.Opened[childPath] = cw
  102. go cw.Scava(t.Scendi())
  103. }
  104. }
  105. if len(w.Opened) != 0 {
  106. Console.Debugln(Gray("Spawning Ammucchia:", w.INode.Path))
  107. go w.Ammucchia(t)
  108. } else {
  109. w.DoneOut <- StatusReport{path: w.INode.Path, size: w.INode.Size}
  110. }
  111. }
  112. // Pesa starts the worker for current inode and returns the channel
  113. // to listen to get the size.
  114. func (i *INode) Pesa(t *Trivella, report, done chan StatusReport) {
  115. w := NewRuspa(i, report, done)
  116. go w.Scava(t)
  117. }
  118. // NastroConvogliatore starts a Pesa on a given path and displays
  119. // the current value of the size.
  120. func NastroConvogliatore(path string, depth uint) {
  121. finished := false
  122. pathINode, err := INodeFromPath(path)
  123. if err != nil {
  124. Console.Fatalln("Failed creating inode:", err)
  125. }
  126. Console.Debugln(Gray("Starting Pesa on path:", path))
  127. report := make(chan StatusReport)
  128. done := make(chan StatusReport)
  129. writer := bufio.NewWriter(os.Stdout)
  130. defer writer.Flush()
  131. t := TrInit(writer, depth)
  132. fmt.Fprintf(writer, "%s\n", Green("Starting..."))
  133. pathINode.Pesa(t, report, done)
  134. for !finished {
  135. select {
  136. case res := <-report:
  137. Console.Debugln(Gray("MAIN: result =>", res))
  138. case res := <-done:
  139. Console.Debugln(Gray("MAIN: done =>", res))
  140. finished = true
  141. }
  142. }
  143. fmt.Fprintf(writer, "%s: %d\n", path, pathINode.Size)
  144. return
  145. }