123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- from mastodon import Mastodon
- from datetime import datetime, timedelta
- import pytz
- from environs import Env
- from jinja2 import Template
- import logging
- import sys
- import time
- class Bot(object):
- def __init__(self, access_token, instance_url, msg_template='', debug_mode=True, account_id=1, max_chars=500):
- self.max_chars = max_chars
- self.instance_url = instance_url
- self.msg_template = Template(msg_template)
- self.client = Mastodon(
- access_token=access_token,
- api_base_url=self.instance_url
- )
- self.debug_mode = debug_mode
- #start by sending welcome to accounts that have registered after the bot has started
- self.newest_account_creation_date = pytz.utc.localize(datetime.now())
- #used to update self.newest_account_creation_date
- self.newest_account_creation_date_record = self.newest_account_creation_date
- self.log = logging.getLogger()
- if self.debug_mode:
- self.log.setLevel(logging.DEBUG)
- else:
- self.log.setLevel(logging.INFO)
- ch = logging.StreamHandler(sys.stdout)
- self.log.addHandler(ch)
- self.account_id = account_id
- def _should_get_msg(self, account):
- if self.debug_mode:
- self.log.debug("--Filtering {}--".format(account.username))
- self.log.debug("Account url: {}".format(account.url))
- self.log.debug("Created at: {}".format(account.created_at))
- self.log.debug("Newest account date {}".format(self.newest_account_creation_date))
- self.log.debug("")
- if account.created_at > self.newest_account_creation_date_record:
- self.newest_account_creation_date_record = account.created_at
- return account.url.startswith(self.instance_url) and account.created_at > self.newest_account_creation_date and not account.locked
- def get_users(self):
- users_to_msg = []
- def get_prev(users):
- followers = []
- try:
- followers = self.client.fetch_previous(users)
- except:
- self.log.error("Cannot fetch followers.")
- filtered = [follower for follower in followers if self._should_get_msg(follower)]
- users_to_msg.extend(filtered)
- # this assumes that followers are returned in descending order by follow date
- if len(filtered) == 0:
- return users_to_msg
- return get_prev(followers)
- followers = []
- try:
- followers = self.client.account_followers(self.account_id, limit=80)
- except Exception as e:
- self.log.error("Cannot fetch followers.")
- self.log.exception(e)
- except:
- self.log.error("Error")
- filtered = [follower for follower in followers if self._should_get_msg(follower)]
- users_to_msg.extend(filtered)
- if len(filtered) == 0:
- return users_to_msg
- return get_prev(followers)
- def send_msg(self, account):
- msg = self.msg_template.render(account)
- toots = msg.split('\n\n\n')
- for toot in toots:
- reply_id = None
- if len(toot) < self.max_chars:
- reply_id = self.client.status_post(toot, visibility='direct', in_reply_to_id=reply_id).id
- if self.debug_mode:
- self.log.debug("Replying to {}".format(reply_id))
- else:
- self.log.error("Make sure to correctly split the toot into parts that can be sent individually")
- def go(self):
- users_to_msg = self.get_users()
- self.newest_account_creation_date = self.newest_account_creation_date_record
- for u in users_to_msg:
- self.send_msg(u)
- if self.debug_mode:
- self.log.debug('{} toots sent'.format(len(users_to_msg)))
- def create_bot():
- env = Env()
- env.read_env()
- access_token = env('ACCESS_TOKEN')
- max_chars = env.int('INSTANCE_MAX_CHARS', 500)
- instance_base_url = env('INSTANCE_BASE_URL')
- msg_template = ''
- with open('message.template', 'r') as file:
- msg_template = file.read()
- debug_mode = env.bool('DEBUG', False)
- account_id = env.int('ACCOUNT_ID')
- bot = Bot(access_token,
- instance_url=instance_base_url,
- msg_template=msg_template,
- debug_mode=debug_mode,
- account_id=account_id,
- max_chars=max_chars)
- return bot
- def debug_shell(bot):
- cmd = input('debug command> ').strip()
- if cmd == 'c' or cmd == 'continue':
- return False
- elif cmd == 'update date':
- bot.newest_account_creation_date = pytz.utc.localize(datetime.now())
- bot.newest_account_creation_date_record = bot.newest_account_creation_date
- elif cmd.startswith('before'):
- bot.newest_account_creation_date = pytz.utc.localize(datetime.now() - timedelta(hours=int(cmd.split(' ')[1])))
- bot.newest_account_creation_date_record = bot.newest_account_creation_date
- elif cmd == 'print date':
- bot.log.debug(bot.newest_account_creation_date)
- return True
- def run():
- bot = create_bot()
- while True:
- if bot.debug_mode:
- while debug_shell(bot):
- pass
- else:
- time.sleep(30) # 30 secs
- bot.go()
- if __name__ == '__main__':
- run()
|