rss2gancio/estrai.py
2025-06-18 23:03:24 +02:00

136 lines
4.1 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import datetime
import toml
from pathlib import Path
import extruct
import markdown
import pytz
import requests_cache
from lxml import etree
class DataNotFound(Exception):
pass
def output(*args, feed=None, item=None, session=None, **kwargs):
"""This is a feed2exec plugin"""
account_name = args
try:
pp = PageParser()
event = pp.parse(item["link"])
pp.publish(event, account_name)
except Exception as exc:
print(item["link"], "errore", exc)
return False
print(item["link"], "OK")
return True
def find_schema(extracted_data: dict, schema: str) -> dict:
for method, datalist in extracted_data.items():
for data in datalist:
if data.get("type") == schema:
return data["properties"]
raise DataNotFound
class PageParser:
def __init__(self):
self.session = requests_cache.CachedSession(
backend="sqlite", expire_after=datetime.timedelta(hours=2)
)
def get_page_content(self, url):
resp = self.session.get(url)
return resp.text
def main(self):
p = argparse.ArgumentParser()
p.add_argument("link")
args = p.parse_args()
self.parse(args.link)
def parse_datetime(self, timestr: str) -> datetime.datetime:
timestr = timestr.strip()
locale_tzname = "Europe/Rome"
locale_tz = pytz.timezone(locale_tzname)
for fmt in [
"%Y-%m-%d %H:%M%z",
"%Y-%m-%d %H:%M:%S%z",
]:
try:
local_date = datetime.datetime.strptime(timestr, fmt)
except Exception:
continue
return local_date.astimezone(locale_tz).strftime("%s")
for fmt in [
"%Y-%m-%dT%H:%M",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %H:%M:%S",
]:
try:
naive_date = datetime.datetime.strptime(timestr, fmt)
except:
continue
return naive_date.strftime("%s")
raise Exception(f"Could not parse date {timestr}")
def parse(self, link):
# URL = "https://forteprenestino.net/attivita/3328-fascino-2"
content = self.get_page_content(link)
extraction = extruct.extract(content, base_url=link, return_html_node=True)
event = find_schema(extraction, "http://schema.org/Event")
event["content"] = etree.tostring(event['mainEntityOfPage']['htmlNode'], encoding='utf8').decode('utf8')
event["start_epoch"] = self.parse_datetime(event["startDate"])
if 'endDate' in event:
event["end_epoch"] = self.parse_datetime(event["endDate"])
return event
def publish(self, event: dict, account_name: str):
config_file = Path("conf.toml")
conf = tomllib.load(config_file.open("rb"))
account = conf["accounts"][account_name]
resp = self.session.post(
f"{account['url']}oauth/login",
data={
"client_id": "self",
"password": account["password"],
"username": account["username"],
"grant_type": "password",
},
)
resp.raise_for_status()
access_token = resp.json()["access_token"]
self.session.headers.update({"Authorization": f"Bearer {access_token}"})
data = {
"title": event["name"],
"description": markdown.markdown(event["description"]),
"start_datetime": event["start_epoch"],
"tags[]": "hacking",
}
if "end_epoch" in event:
data["end_datetime"] = event["end_epoch"]
if "place_id" in account:
data["place_id"] = account["place_id"]
else:
data["place_name"] = account["place_name"]
data["place_address"] = account["place_address"]
resp = self.session.post(
f"{account['url']}api/event",
data=data,
# files={ "image": open("OFFICINE.jpg", "rb"), },
)
resp.raise_for_status()
if __name__ == "__main__":
PageParser().main()