worker.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package sizer
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "os"
  7. )
  8. type dummy struct{}
  9. func (d dummy) Write(p []byte) (n int, err error) {
  10. return 0, nil
  11. }
  12. // Trivella provides an io.Writer with a notion of depth in the
  13. // path graph
  14. type Trivella struct {
  15. Level uint
  16. Writer io.Writer
  17. }
  18. // Scendi increases the depth in the graph
  19. func (t *Trivella) Scendi() *Trivella {
  20. var v *Trivella
  21. if t.Level > 0 {
  22. v = TrInit(t.Writer, t.Level-1)
  23. } else {
  24. var d dummy
  25. v = TrInit(d, 0)
  26. }
  27. return v
  28. }
  29. // TrInit assigns the writer to a Trivella
  30. func TrInit(writer io.Writer, level uint) *Trivella {
  31. return &Trivella{Writer: writer, Level: level}
  32. }
  33. // Ruspa is the asynchronous worker that is in charge of collecting
  34. // the underlying sizes and relay the total to the eventual parent.
  35. type Ruspa struct {
  36. INode *INode
  37. ReportIn chan StatusReport
  38. ReportOut chan StatusReport
  39. DoneIn chan StatusReport
  40. DoneOut chan StatusReport
  41. Opened map[string]*Ruspa
  42. }
  43. // NewRuspa inits a new Ruspa
  44. func NewRuspa(inode *INode, reportOut, doneOut chan StatusReport) *Ruspa {
  45. var r Ruspa
  46. r.INode = inode
  47. r.ReportOut = reportOut
  48. r.DoneOut = doneOut
  49. r.ReportIn = make(chan StatusReport)
  50. r.DoneIn = make(chan StatusReport)
  51. r.Opened = make(map[string]*Ruspa)
  52. return &r
  53. }
  54. // Ammucchia gathers the size of the underlying INode(s) from the Ruspa worker
  55. // and adds to the INode size.
  56. func (w *Ruspa) Ammucchia(t *Trivella) {
  57. Console.Debugln(Gray("Ammucchia:", w.INode.Path))
  58. for len(w.Opened) > 0 {
  59. select {
  60. case report := <-w.ReportIn:
  61. if report.err == nil {
  62. w.INode.Size += report.size
  63. fmt.Fprintf(t.Writer, "%s: %d\n", w.INode.Path, w.INode.Size)
  64. } else {
  65. Console.Debugln(Red(report.err))
  66. }
  67. case leaf := <-w.DoneIn:
  68. delete(w.Opened, leaf.path)
  69. }
  70. }
  71. Console.Debugln(Gray(w.INode.Path, ": Completed. Exiting..."))
  72. w.DoneOut <- StatusReport{path: w.INode.Path, size: w.INode.Size}
  73. }
  74. // Scava starts a worker on the given INode.
  75. func (w *Ruspa) Scava(t *Trivella) {
  76. Console.Debugln(Gray("Scava: ", w.INode.Path))
  77. children, err := ls(w.INode.Path)
  78. if err != nil {
  79. w.ReportOut <- StatusReport{path: w.INode.Path, err: err}
  80. }
  81. for _, childPath := range children {
  82. kind, size, err := IdentifyType(childPath)
  83. if err != nil {
  84. w.ReportOut <- StatusReport{path: childPath, err: err}
  85. }
  86. childINode := NewINode(kind, size, childPath)
  87. w.INode.Children.Append(childINode)
  88. switch {
  89. case kind == FileType:
  90. Console.Debugln(Gray("[file]", w.INode.Path, " ~> ", childINode.Path))
  91. w.ReportOut <- StatusReport{path: childPath, size: size}
  92. case kind == DirType:
  93. Console.Debugln(Gray("[dir]", w.INode.Path, " ~> ", childINode.Path))
  94. w.INode.Size += size
  95. cw := NewRuspa(childINode, w.ReportIn, w.DoneIn)
  96. w.Opened[childPath] = cw
  97. go cw.Scava(t.Scendi())
  98. }
  99. }
  100. if len(w.Opened) != 0 {
  101. go w.Ammucchia(t)
  102. } else {
  103. w.DoneOut <- StatusReport{path: w.INode.Path, size: w.INode.Size}
  104. }
  105. }
  106. // Pesa starts the worker for current inode and returns the channel
  107. // to listen to get the size.
  108. func (i *INode) Pesa(t *Trivella, report, done chan StatusReport) {
  109. w := NewRuspa(i, report, done)
  110. go w.Scava(t)
  111. }
  112. // NastroConvogliatore starts a Pesa on a given path and displays
  113. // the current value of the size.
  114. func NastroConvogliatore(path string, depth uint) {
  115. finished := false
  116. pathINode, err := INodeFromPath(path)
  117. if err != nil {
  118. Console.Fatalln("Failed creating inode:", err)
  119. }
  120. report := make(chan StatusReport)
  121. done := make(chan StatusReport)
  122. writer := bufio.NewWriter(os.Stdout)
  123. defer writer.Flush()
  124. t := TrInit(writer, depth)
  125. fmt.Fprintf(writer, "%s\n", Green("Starting..."))
  126. pathINode.Pesa(t, report, done)
  127. for !finished {
  128. select {
  129. case res := <-report:
  130. Console.Debugln(Gray("MAIN: result =>", res))
  131. case res := <-done:
  132. Console.Debugln(Gray("MAIN: done =>", res))
  133. finished = true
  134. }
  135. }
  136. fmt.Fprintf(writer, "%s: %d\n", path, pathINode.Size)
  137. }