import argparse import configparser import datetime import getpass import logging import pickle import sys import requests as requests from constants import COMING_ENTRY_CODE_ID, LEAVING_ENTRY_CODE_ID, PUNCH_COMMANDS, BREAK_START_ENTRY_CODE_ID, \ BREAK_END_ENTRY_CODE_ID, SIMPLE_DATE_FORMAT, SIMPLE_TIME_FORMAT, SIMPLE_DATETIME_FORMAT, \ SIMPLE_DATETIME_FORMAT_HUMAN, USER_AGENT logger = logging.getLogger() logging.basicConfig(level=logging.INFO) class TimeBot: def __init__(self, baseurl: str, user: str, password: str = None, ask_for_password: bool = False, save_session: bool = False): self.logger = logging.getLogger(self.__class__.__name__) self.baseurl = self._sanitize_baseurl(baseurl) self.user = user self.password = password self._ask_for_password = ask_for_password self._save_session = save_session self._session = None self._current_user = None @staticmethod def _sanitize_baseurl(baseurl: str): if baseurl.endswith("/"): return baseurl return baseurl + "/" @property def session(self): """ Return requests session object and auto login if necessary. :return: requests session :raises on any http error """ if self._session is None: self._session = requests.Session() self._session.headers.update({"User-Agent": USER_AGENT}) try: self._load_session_cookies(self._session) except FileNotFoundError as e: logger.warning(e) # file does not exist... ignored except (EOFError, pickle.UnpicklingError) as e: logger.warning(e) # file seems to be corrupt... ignoring request = self._session.get(self.baseurl + "Employee/GetEmployeeList") if 400 <= request.status_code < 500: self.logger.debug(f"got error {request.status_code}... trying to log in") self._login(self._session) self._save_session_cookies(self._session) else: request.raise_for_status() return self._session def _login(self, session: requests.Session): """ Obtain session cookie. :raises: on status code != 2xx """ if self.password is None and self._ask_for_password: self.password = self._get_password() else: raise Exception("could not obtain password") login_data = { "username": self.user, "password": self.password, } request = session.post(self.baseurl + "Account/LogOn", data=login_data) request.raise_for_status() @staticmethod def _get_password(): """ Ask the user for his password. :return: the users password """ return getpass.getpass("Enter your password: ") def _save_session_cookies(self, session: requests.Session): """ Save the session cookies as pickle file. :param requests.Session session: the requests session to extract the cookies from """ if self._save_session: with open(".kekse", "wb") as f: self.logger.debug("pickling session cookies") pickle.dump(requests.utils.dict_from_cookiejar(session.cookies), f) def _load_session_cookies(self, session: requests.Session): """ Load the session cookies from the pickle file and updates the given session. :param requests.Session session: the requests session which will be updated with the loaded cookies """ if self._save_session: with open(".kekse", "rb") as f: self.logger.debug("loading pickled cookies") session.cookies.update(requests.utils.cookiejar_from_dict(pickle.load(f))) def add_entry(self, punch_datetime: datetime.datetime, entry_code: int, note: str = None) -> requests.Response: """ Add mobatime entry. :param datetime.datetime punch_datetime: datetim object :param int entry_code: entry type code :param str note: free text note added to the Mobatime entry :return: requests response object """ punch_date = punch_datetime.strftime(SIMPLE_DATE_FORMAT) punch_time = punch_datetime.strftime(SIMPLE_TIME_FORMAT) entry_data = { "periode0Date": punch_date, "periode0Time": punch_time, "selectedEntryCode": entry_code, "selectedPeriodType": 0, "employeeId": self.get_current_user_id(), } if note: entry_data["note"] = note request = self.session.post(self.baseurl + "Entry/SaveEntry", data=entry_data) return request def list_employees(self): """ List all employees which are obviously in the same team as you. :return: list of employees """ request = self.session.get(self.baseurl + "Employee/GetEmployees") request.raise_for_status() return request.json() def _get_user_profile_for_current_user(self): """ Returns all user information for the current user. :return: all user information as dict """ request = self.session.get(self.baseurl + "Employee/GetUserProfileForCurrentUser") request.raise_for_status() return request.json() @property def current_user(self): """ Returns all user information for the current user. :return: all user information as dict """ if self._current_user is None: self._current_user = self._get_user_profile_for_current_user() return self._current_user def get_current_user_id(self): """ Get the current users id. :return: user id """ return self.current_user["employee"]["id"] def get_entries(self, entries: int = 10, start_date: datetime.datetime = None, end_date: datetime.datetime = None) -> list: """ List time entries. :param int entries: number of entries to return :param datetime.datetime start_date: start date of request; by default ``end_date`` - 10 days :param datetime.datetime end_date: end date of request; usually the more recent date :return: """ if not end_date: end_date = datetime.datetime.now() if not start_date: start_date = end_date - datetime.timedelta(days=10) filters = {"take": entries, "skip": 0, "page": 1, "pageSize": entries, "sort": [{"field": "dateTime", "dir": "desc"}], "filter": {"logic": "and", "filters": [ {"field": "employeeId", "operator": "eq", "value": self.get_current_user_id()}, {"field": "errorText", "operator": "eq", "value": "all"}]}} filters["filter"]["filters"].append({ "field": "startDate", "operator": "gte", "value": start_date.strftime(SIMPLE_DATETIME_FORMAT), }) filters["filter"]["filters"].append({ "field": "endDate", "operator": "lte", "value": end_date.strftime(SIMPLE_DATETIME_FORMAT), }) request = self.session.post(self.baseurl + "Entry/GetEntries", json=filters) request.raise_for_status() return request.json()["data"] def get_tracking_data(self) -> dict: """ Get the current tracking state. This contains: * current presence state * last booking * last booking date * available quick actions :return: dict of the info mentioned above """ request = self.session.get(self.baseurl + "Tracking/GetTrackingData") request.raise_for_status() return request.json() def get_account_information(self) -> list: """ Get the current account information. * remaining vacation days * time account balance :return: list with mentioned infos """ request = self.session.get(self.baseurl + "Employee/GetAccountInformation") request.raise_for_status() return request.json() def punch_in(self, punch_datetime: datetime.datetime): """ :param datetime.datetime punch_datetime: datetime object :raises: on status code != 2xx """ self.add_entry(punch_datetime, COMING_ENTRY_CODE_ID, note="da").raise_for_status() def punch_out(self, punch_datetime: datetime.datetime): """ :param datetime.datetime punch_datetime: datetime object :raises: on status code != 2xx """ self.add_entry(punch_datetime, LEAVING_ENTRY_CODE_ID, note="weg").raise_for_status() def break_start(self, punch_datetime: datetime.datetime): """ :param datetime.datetime punch_datetime: datetime object :raises: on status code != 2xx """ self.add_entry(punch_datetime, BREAK_START_ENTRY_CODE_ID, note="pause").raise_for_status() def break_end(self, punch_datetime: datetime.datetime): """ :param datetime.datetime punch_datetime: datetime object :raises: on status code != 2xx """ self.add_entry(punch_datetime, BREAK_END_ENTRY_CODE_ID, note="pause ende").raise_for_status() if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("-v", help="enable debug logging", action="store_true") parser.add_argument("-u", help="mobatime login user", required=True) parser.add_argument("-p", help="mobatime login user password", default=None) parser.add_argument("-c", help="config file", default="timebot.ini") parser.add_argument("--save-cookies", help="save auth cookies to `./.kekse`", action="store_true", default=False) subparsers = parser.add_subparsers(help='sub-command help', dest='subparser_name') # subparser command: `status` parser_status = subparsers.add_parser("status", help="show your current tracking status") # subparser command: `list-entries` parser_list_entries = subparsers.add_parser("list-entries", help="use this command to list your time entries") parser_list_entries.add_argument("--start-date", help=f"start date filter in format `{SIMPLE_DATETIME_FORMAT_HUMAN}` " f"(default: now - 10days; unset for default)") parser_list_entries.add_argument("--end-date", help=f"end date filter in format `{SIMPLE_DATETIME_FORMAT_HUMAN}` " f"(default: now; unset for default)") parser_list_entries.add_argument("--items", help="max items to request per page", default=20, type=int) # subparser command: `punch` parser_punch = subparsers.add_parser("punch", help="use this command to punch in, punch out, or create break entries") parser_punch.add_argument("-t", help=f"type of time entry; this can be {', '.join(PUNCH_COMMANDS)}", default="punch_in", choices=PUNCH_COMMANDS) parser_punch.add_argument("-s", help=f"timestamp in format `{SIMPLE_DATETIME_FORMAT_HUMAN}` or `now`", default="now") # subparser command: `smart-punch` parser_smart_punch = subparsers.add_parser("smart-punch", help="use this command to auto punch in, punch out and create break " "entries; this command tries to detect your last action and will " "create entries in to following order: " "punch_in(now) -> break_start(now) -> break_end(now) " "-> punch_out(now)") args = parser.parse_args() if args.v: logger.setLevel(logging.DEBUG) config = configparser.ConfigParser() config.read(args.c) tb = TimeBot(baseurl=config["general"]["baseurl"], user=args.u, password=args.p, ask_for_password=True, save_session=args.save_cookies) if args.subparser_name == "punch": if args.s == "now": punch_datetime = datetime.datetime.now() else: punch_datetime = datetime.datetime.strptime(args.s, SIMPLE_DATETIME_FORMAT) logger.info("running `{}` with date `{}` and time `{}`".format( args.t, punch_datetime.strftime(SIMPLE_DATE_FORMAT), punch_datetime.strftime(SIMPLE_TIME_FORMAT), )) getattr(tb, args.t)(punch_datetime) elif args.subparser_name == "smart-punch": now = datetime.datetime.now() last_punch = tb.get_entries(1, now.replace(hour=0, minute=0, second=0, microsecond=0)) method = None if not last_punch: logger.debug("could not detect any time entry for today... punching in") method = "punch_in" elif last_punch[0]["entryNumber"] == COMING_ENTRY_CODE_ID: logger.debug("your last entry was `punch_in`... starting break") method = "break_start" elif last_punch[0]["entryNumber"] == BREAK_START_ENTRY_CODE_ID: logger.debug("your last entry was `break_start`... ending break") method = "break_end" elif last_punch[0]["entryNumber"] == BREAK_END_ENTRY_CODE_ID: logger.debug("your last entry was `break_end`... punching out") method = "punch_out" elif last_punch[0]["entryNumber"] == LEAVING_ENTRY_CODE_ID: logger.error("your last entry was `punch_out`... punching in again with this command is not supported") sys.exit(1) if method is None: logger.error("hit an unknown situation... detection failed; run with `-v` for more info") logger.debug(f"last entry was: {last_punch}") exit(1) logger.info("running `{}` with date `{}` and time `{}`".format( method, now.strftime(SIMPLE_DATE_FORMAT), now.strftime(SIMPLE_TIME_FORMAT), )) getattr(tb, method)(now) elif args.subparser_name == "list-entries": end_date = None if args.end_date: end_date = datetime.datetime.strptime(args.end_date, SIMPLE_DATETIME_FORMAT) start_date = None if args.start_date: start_date = datetime.datetime.strptime(args.start_date, SIMPLE_DATETIME_FORMAT) data = tb.get_entries(entries=args.items, start_date=start_date, end_date=end_date) data.reverse() for i in data: print("Entry: {} - DateTime: {} - Note: {}".format(i["entryName"], i["dateTime"], i["note"])) elif args.subparser_name == "status": tracking_data = tb.get_tracking_data() account_info = tb.get_account_information() print("Tracking Info:") print(f" Current State: {tracking_data['actualState']}") print(f" Last Booking: {tracking_data['lastBooking']}") print(f" Last Booking Date: {tracking_data['lastBookingDate']}") print("Account Infos:") for i in account_info: print(f" {i['accountName']}: {i['value']}") else: logger.error("Noting done... dunno what you want!") sys.exit(1) sys.exit(0)