136 lines
4.1 KiB
Python
Executable file
136 lines
4.1 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import datetime
|
|
import toml
|
|
from pathlib import Path
|
|
|
|
import extruct
|
|
import markdown
|
|
import pytz
|
|
import requests_cache
|
|
from lxml import etree
|
|
|
|
|
|
class DataNotFound(Exception):
|
|
pass
|
|
|
|
|
|
def output(*args, feed=None, item=None, session=None, **kwargs):
|
|
"""This is a feed2exec plugin"""
|
|
account_name = args
|
|
try:
|
|
pp = PageParser()
|
|
event = pp.parse(item["link"])
|
|
pp.publish(event, account_name)
|
|
except Exception as exc:
|
|
print(item["link"], "errore", exc)
|
|
return False
|
|
print(item["link"], "OK")
|
|
return True
|
|
|
|
|
|
def find_schema(extracted_data: dict, schema: str) -> dict:
|
|
for method, datalist in extracted_data.items():
|
|
for data in datalist:
|
|
if data.get("type") == schema:
|
|
return data["properties"]
|
|
raise DataNotFound
|
|
|
|
|
|
class PageParser:
|
|
def __init__(self):
|
|
self.session = requests_cache.CachedSession(
|
|
backend="sqlite", expire_after=datetime.timedelta(hours=2)
|
|
)
|
|
|
|
def get_page_content(self, url):
|
|
resp = self.session.get(url)
|
|
return resp.text
|
|
|
|
def main(self):
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("link")
|
|
args = p.parse_args()
|
|
|
|
self.parse(args.link)
|
|
|
|
def parse_datetime(self, timestr: str) -> datetime.datetime:
|
|
timestr = timestr.strip()
|
|
locale_tzname = "Europe/Rome"
|
|
locale_tz = pytz.timezone(locale_tzname)
|
|
for fmt in [
|
|
"%Y-%m-%d %H:%M%z",
|
|
"%Y-%m-%d %H:%M:%S%z",
|
|
]:
|
|
try:
|
|
local_date = datetime.datetime.strptime(timestr, fmt)
|
|
except Exception:
|
|
continue
|
|
return local_date.astimezone(locale_tz).strftime("%s")
|
|
for fmt in [
|
|
"%Y-%m-%dT%H:%M",
|
|
"%Y-%m-%dT%H:%M:%S",
|
|
"%Y-%m-%d %H:%M",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
]:
|
|
try:
|
|
naive_date = datetime.datetime.strptime(timestr, fmt)
|
|
except:
|
|
continue
|
|
return naive_date.strftime("%s")
|
|
raise Exception(f"Could not parse date {timestr}")
|
|
|
|
def parse(self, link):
|
|
# URL = "https://forteprenestino.net/attivita/3328-fascino-2"
|
|
content = self.get_page_content(link)
|
|
extraction = extruct.extract(content, base_url=link, return_html_node=True)
|
|
event = find_schema(extraction, "http://schema.org/Event")
|
|
event["content"] = etree.tostring(event['mainEntityOfPage']['htmlNode'], encoding='utf8').decode('utf8')
|
|
event["start_epoch"] = self.parse_datetime(event["startDate"])
|
|
if 'endDate' in event:
|
|
event["end_epoch"] = self.parse_datetime(event["endDate"])
|
|
return event
|
|
|
|
def publish(self, event: dict, account_name: str):
|
|
config_file = Path("conf.toml")
|
|
conf = tomllib.load(config_file.open("rb"))
|
|
account = conf["accounts"][account_name]
|
|
|
|
resp = self.session.post(
|
|
f"{account['url']}oauth/login",
|
|
data={
|
|
"client_id": "self",
|
|
"password": account["password"],
|
|
"username": account["username"],
|
|
"grant_type": "password",
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
access_token = resp.json()["access_token"]
|
|
self.session.headers.update({"Authorization": f"Bearer {access_token}"})
|
|
|
|
data = {
|
|
"title": event["name"],
|
|
"description": markdown.markdown(event["description"]),
|
|
"start_datetime": event["start_epoch"],
|
|
"tags[]": "hacking",
|
|
}
|
|
if "end_epoch" in event:
|
|
data["end_datetime"] = event["end_epoch"]
|
|
if "place_id" in account:
|
|
data["place_id"] = account["place_id"]
|
|
else:
|
|
data["place_name"] = account["place_name"]
|
|
data["place_address"] = account["place_address"]
|
|
|
|
resp = self.session.post(
|
|
f"{account['url']}api/event",
|
|
data=data,
|
|
# files={ "image": open("OFFICINE.jpg", "rb"), },
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
PageParser().main()
|