zic2ics.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. #!/usr/bin/env python3
  2. import requests
  3. import os
  4. from bs4 import BeautifulSoup
  5. from datetime import datetime, timedelta
  6. import locale
  7. import pytz
  8. from icalendar import Calendar, Event, vDatetime
  9. import re
  10. import argparse
  11. import glob
  12. locale.setlocale(locale.LC_TIME,'it_IT.UTF-8')
  13. def zic2ics(outpath):
  14. os.makedirs('history', exist_ok=True)
  15. page = requests.get("https://zic.it/agenda/").content
  16. cal = html2cal(page)
  17. ical = cal.to_ical()
  18. try:
  19. with open('zic.ics', 'rb') as f:
  20. ical_last = f.read()
  21. except:
  22. ical_last = ""
  23. if ical != ical_last:
  24. try:
  25. with open('zic_all.ics', 'rb') as f:
  26. cal_all = Calendar.from_ical(f.read())
  27. except:
  28. cal_all = Calendar()
  29. ical_all = icalmerge(cal_all, cal, True).to_ical()
  30. with open('zic.ics', 'wb') as f:
  31. f.write(ical)
  32. with open('zic_all.ics', 'wb') as f:
  33. f.write(ical_all)
  34. with open(os.path.join('history','zic_%s.ics' % datetime.now().strftime("%Y%m%d%H%M%S")), 'wb') as f:
  35. f.write(ical)
  36. with open(os.path.join('history','zic_%s.html' % datetime.now().strftime("%Y%m%d%H%M%S")), 'wb') as f:
  37. f.write(page)
  38. if not outpath is None:
  39. with open(os.path.join(outpath, 'zic.ics'), 'wb') as f:
  40. f.write(ical)
  41. with open(os.path.join(outpath, 'zic_all.ics'), 'wb') as f:
  42. f.write(ical_all)
  43. def zicrebuild():
  44. cal_all = Calendar()
  45. for html in sorted(glob.glob(os.path.join("history","zic_*.html"))):
  46. # print(html)
  47. with open(html, 'r') as f:
  48. page = f.read()
  49. cal = html2cal(page)
  50. cdat = datetime.strptime(os.path.basename(html).replace("zic_","").replace(".html",""),"%Y%m%d%H%M%S").astimezone()
  51. cal_all = icalmerge(cal_all, cal, True, 15, cdat)
  52. with open('zic_rebuild.ics', 'wb') as f:
  53. f.write(cal.to_ical())
  54. with open('zic_rebuild_all.ics', 'wb') as f:
  55. f.write(cal_all.to_ical())
  56. def html2cal(page):
  57. cal = Calendar()
  58. cal.add('prodid', '-//Agenda di zic.it//')
  59. cal.add('version', '2.0')
  60. soup = BeautifulSoup(page, "html.parser")
  61. agenda = soup.find("div", class_="entry-content clearfix")
  62. entries = agenda.find_all("p", style=None)
  63. for entry in entries:
  64. try:
  65. lnk = entry.find('a', href=True)['href']
  66. spe = entry.get_text(strip=True, separator='\n').splitlines()
  67. tit = spe[0].strip()
  68. dat = datetime.strptime(spe[2], "%A %d %B %Y - %H:%M").astimezone(tz=pytz.timezone('Europe/Rome'))
  69. det = spe[3][2:-2].split(";")
  70. dov = det[0].strip()
  71. chi = det[1].strip() if 1 < len(det) else ""
  72. # print(" ~ ".join([tit, str(dat), dov, chi]))
  73. tituid = re.sub("[\(\[].*?[\)\]]","",tit).strip() # "\(.*?\)"
  74. uid = "|".join([dat.strftime("%Y%m%d"), tituid, dov])
  75. event = Event()
  76. event.add('summary', tit)
  77. event.add('dtstart', vDatetime(dat))
  78. event.add('description', chi)
  79. event.add('location', dov)
  80. event.add('url', lnk)
  81. event.add('uid', uid)
  82. if any(map(tit.__contains__, ["[rinviato]","[annullato]"])):
  83. event.add('status','CANCELLED')
  84. cal.add_component(event)
  85. except:
  86. pass
  87. return(cal)
  88. def icalmerge(calold, calnew, delete = False, tolerance = 15, cutoffdate = None):
  89. if cutoffdate is None:
  90. cutoffdate = datetime.now(pytz.UTC)
  91. cutoffdate = cutoffdate + timedelta(minutes = tolerance)
  92. cal = Calendar()
  93. cal.add('prodid', calnew.get('prodid').to_ical().decode('utf-8'))
  94. cal.add('version', calnew.get('version').to_ical().decode('utf-8'))
  95. #Add a computed UID if missing
  96. for e in calold.walk('VEVENT'):
  97. if e.get('UID') is None:
  98. uid = "|".join([e.decoded('dtstart').strftime("%Y%m%d"), e.get('Summary').to_ical().decode('utf-8'), e.get('Location').to_ical().decode('utf-8')])
  99. e.add('uid', uid)
  100. for e in calnew.walk('VEVENT'):
  101. if e.get('UID') is None:
  102. uid = "|".join([e.decoded('dtstart').strftime("%Y%m%d"), e.get('Summary').to_ical().decode('utf-8'), e.get('Location').to_ical().decode('utf-8')])
  103. e.add('uid', uid)
  104. calold_uids = [e.get('UID').to_ical().decode('utf-8') for e in calold.walk('VEVENT')]
  105. calnew_uids = [e.get('UID').to_ical().decode('utf-8') for e in calnew.walk('VEVENT')]
  106. imported_uids = []
  107. for e in calold.walk('VEVENT'):
  108. uid = e.get('UID').to_ical().decode('utf-8')
  109. if uid not in calnew_uids:
  110. if e.get('dtstart').dt < cutoffdate:
  111. # print("Past \"%s\"" % uid)
  112. cal.add_component(e)
  113. else:
  114. if delete:
  115. print("Deleted \"%s\"" % uid)
  116. else:
  117. if e.get('status') != 'CANCELLED':
  118. print("Cancelled \"%s\"" % uid)
  119. e['status'] = 'CANCELLED'
  120. cal.add_component(e)
  121. for e in calnew.walk('VEVENT'):
  122. uid = e.get('UID').to_ical().decode('utf-8')
  123. if uid not in calold_uids:
  124. print("Added \"%s\"" % uid)
  125. cal.add_component(e)
  126. else:
  127. # print("Updated \"%s\"" % uid)
  128. cal.add_component(e)
  129. return(cal)
  130. # def cal2date(dtx):
  131. # # Workaround to fix a bug decoding date with TZID
  132. # # DTSTART;TZID=Europe/Rome;VALUE=DATE-TIME:20220611T160000
  133. # # is decoded to 2022-06-11 16:00:00+00:50 instead of 2022-06-11 16:00:00+02:00
  134. # tzx = dtx.params['TZID']
  135. # if tzx is None:
  136. # return dtx.dt
  137. # else:
  138. # print(tzx)
  139. # return (dtx.dt.replace(tzinfo=None).astimezone(tz=pytz.timezone(tzx)))
  140. def dir_path(path):
  141. if os.path.isdir(path):
  142. return path
  143. else:
  144. raise argparse.ArgumentTypeError(f"readable_dir:{path} is not a valid path")
  145. if __name__ == "__main__":
  146. os.chdir(os.path.dirname(__file__))
  147. parser = argparse.ArgumentParser()
  148. parser.add_argument("--rebuild", "-r", action='store_true')
  149. parser.add_argument("--output", "-o", type=dir_path)
  150. args = parser.parse_args()
  151. if args.rebuild:
  152. print("Rebuilding")
  153. zicrebuild()
  154. else:
  155. zic2ics(args.output)