mobatime cmd line util
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
timebot/timebot/timebot.py

397 lines
15 KiB

import datetime
import getpass
import logging
import pickle
import sys
from typing import Union
import requests as requests
from timebot.constants import PunchCodes, DateFormats, USER_AGENT
package_logger = logging.getLogger(__name__)
class TimebotObtainPasswordError(Exception):
pass
class TimeParseError(Exception):
pass
def parse_user_time_input(stamp: str) -> datetime.datetime:
errors = []
try:
package_logger.debug(f"trying to parse date with format \"{DateFormats.SIMPLE_DATETIME.value}\"")
return datetime.datetime.strptime(stamp, DateFormats.SIMPLE_DATETIME.value)
except ValueError as e:
errors.append(e)
try:
package_logger.debug(f"trying to parse date with format \"{DateFormats.SIMPLE_TIME}\"")
t = datetime.datetime.strptime(stamp, DateFormats.SIMPLE_TIME.value)
dt = datetime.datetime.now()
dt = dt.replace(hour=0, minute=0, second=0, microsecond=0)
dt += datetime.timedelta(hours=t.hour, minutes=t.minute)
return dt
except ValueError as e:
errors.append(e)
package_logger.error(f"could not parse date with format \"{DateFormats.SIMPLE_DATETIME.value}\" or"
f" \"{DateFormats.SIMPLE_TIME.value}\"")
raise TimeParseError(errors)
class MobatimeApi:
def __init__(self, baseurl: str, user: str, password: str = None, ask_for_password: bool = False,
save_session: bool = False, cookie_store: str = ".kekse"):
self.logger = logging.getLogger(self.__class__.__name__)
self.baseurl = self._sanitize_baseurl(baseurl)
self.user = user
self.password = password
self.ask_for_password = ask_for_password
self._save_session = save_session
self._session = None
self._cookie_store = cookie_store
self._current_user = None
@staticmethod
def _sanitize_baseurl(baseurl: str):
if baseurl.endswith("/"):
return baseurl
return baseurl + "/"
@property
def session(self):
"""
Return requests session object and auto login if necessary.
:return: requests session
:raises on any http error
"""
if self._session is None:
self._session = requests.Session()
self._session.headers.update({"User-Agent": USER_AGENT})
try:
self._load_session_cookies(self._session)
except FileNotFoundError as e:
self.logger.warning(e) # file does not exist... ignored
except (EOFError, pickle.UnpicklingError) as e:
self.logger.warning(e) # file seems to be corrupt... ignoring
request = self._session.get(self.baseurl + "Employee/GetEmployeeList", timeout=2)
if 400 <= request.status_code < 500:
self.logger.debug(f"got error {request.status_code}... trying to log in")
self._login(self._session)
self._save_session_cookies(self._session)
else:
request.raise_for_status()
return self._session
def _login(self, session: requests.Session):
"""
Clear existing session cookies and try new login.
:raises: on status code != 2xx
"""
if self.password is None and self.ask_for_password:
self.password = self._get_password()
elif self.password and not self.ask_for_password:
pass
else:
raise TimebotObtainPasswordError("Unauthorized: could not obtain password")
login_data = {
"username": self.user,
"password": self.password,
}
session.cookies.clear_session_cookies()
session.post(self.baseurl + "Account/LogOn",
data=login_data,
timeout=2).raise_for_status() # This always gives 200 ... even with wrong password
session.get(self.baseurl + "Employee/GetEmployeeList", timeout=2).raise_for_status()
@staticmethod
def _get_password():
"""
Ask the user for his password.
:return: the users password
"""
return getpass.getpass("Enter your password: ")
def _save_session_cookies(self, session: requests.Session):
"""
Save the session cookies as pickle file.
:param requests.Session session: the requests session to extract the cookies from
"""
if self._save_session:
with open(self._cookie_store, "wb") as f:
self.logger.debug("pickling session cookies")
pickle.dump(requests.utils.dict_from_cookiejar(session.cookies), f)
def _load_session_cookies(self, session: requests.Session):
"""
Load the session cookies from the pickle file and updates the given session.
:param requests.Session session: the requests session which will be updated with the loaded cookies
"""
if self._save_session:
with open(self._cookie_store, "rb") as f:
self.logger.debug("loading pickled cookies")
session.cookies.update(requests.utils.cookiejar_from_dict(pickle.load(f)))
@property
def current_user(self):
"""
Returns all user information for the current user.
:return: all user information as dict
"""
if self._current_user is None:
self._current_user = self.get_user_profile_for_current_user()
return self._current_user
def get_current_user_id(self):
"""
Get the current users id.
:return: user id
"""
return self.current_user["employee"]["id"]
def get_employees(self):
"""
List all employees which are obviously in the same team as you.
:return: list of employees
"""
request = self.session.get(self.baseurl + "Employee/GetEmployees", timeout=2)
request.raise_for_status()
return request.json()
def get_user_profile_for_current_user(self):
"""
Returns all user information for the current user.
:return: all user information as dict
"""
request = self.session.get(self.baseurl + "Employee/GetUserProfileForCurrentUser", timeout=2)
request.raise_for_status()
return request.json()
def save_entry(self, punch_datetime: datetime.datetime, entry_code: int, note: str = None) -> requests.Response:
"""
Add mobatime entry.
:param datetime.datetime punch_datetime: datetim object
:param int entry_code: entry type code
:param str note: free text note added to the Mobatime entry
:return: requests response object
"""
punch_date = punch_datetime.strftime(DateFormats.SIMPLE_DATE.value)
punch_time = punch_datetime.strftime(DateFormats.SIMPLE_TIME.value)
entry_data = {
"periode0Date": punch_date,
"periode0Time": punch_time,
"selectedEntryCode": entry_code,
"selectedPeriodType": 0,
"employeeId": self.get_current_user_id(),
}
if note:
entry_data["note"] = note
request = self.session.post(self.baseurl + "Entry/SaveEntry", data=entry_data, timeout=2)
return request
def get_entries(self, entries: Union[str, None] = 10,
start_date: datetime.datetime = None,
end_date: datetime.datetime = None) -> list:
"""
List time entries.
:param int entries: number of entries to return
:param datetime.datetime start_date: start date of request; by default ``end_date`` - 10 days
:param datetime.datetime end_date: end date of request; usually the more recent date
:return:
"""
if not end_date:
end_date = datetime.datetime.now()
if not start_date:
start_date = end_date - datetime.timedelta(days=10)
filters = {"take": entries, "skip": 0, "page": 1, "pageSize": entries,
"sort": [{"field": "dateTime", "dir": "desc"}],
"filter": {"logic": "and",
"filters": [
{"field": "employeeId", "operator": "eq", "value": self.get_current_user_id()},
{"field": "errorText", "operator": "eq", "value": "all"}]}}
filters["filter"]["filters"].append({
"field": "startDate",
"operator": "gte",
"value": start_date.strftime(DateFormats.SIMPLE_DATETIME.value),
})
filters["filter"]["filters"].append({
"field": "endDate",
"operator": "lte",
"value": end_date.strftime(DateFormats.SIMPLE_DATETIME.value),
})
request = self.session.post(self.baseurl + "Entry/GetEntries", json=filters, timeout=2)
request.raise_for_status()
return request.json()["data"]
def get_tracking_data(self) -> dict:
"""
Get the current tracking state.
This contains:
* current presence state
* last booking
* last booking date
* available quick actions
:return: dict of the info mentioned above
"""
request = self.session.get(self.baseurl + "Tracking/GetTrackingData", timeout=2)
request.raise_for_status()
return request.json()
def get_account_information(self) -> list:
"""
Get the current account information.
* remaining vacation days
* time account balance
:return: list with mentioned infos
"""
request = self.session.get(self.baseurl + "Employee/GetAccountInformation", timeout=2)
request.raise_for_status()
return request.json()
class TimeBot:
def __init__(self, mobatime_api: MobatimeApi):
self.logger = logging.getLogger(self.__class__.__name__)
self.mobatime_api = mobatime_api
def punch_in(self, punch_datetime: datetime.datetime):
"""
:param datetime.datetime punch_datetime: datetime object
:raises: on status code != 2xx
"""
self.mobatime_api.save_entry(punch_datetime, PunchCodes.COMING.value, note="da").raise_for_status()
def punch_out(self, punch_datetime: datetime.datetime):
"""
:param datetime.datetime punch_datetime: datetime object
:raises: on status code != 2xx
"""
self.mobatime_api.save_entry(punch_datetime, PunchCodes.LEAVING.value, note="weg").raise_for_status()
def break_start(self, punch_datetime: datetime.datetime):
"""
:param datetime.datetime punch_datetime: datetime object
:raises: on status code != 2xx
"""
self.mobatime_api.save_entry(punch_datetime, PunchCodes.BREAK_START.value, note="pause").raise_for_status()
def break_end(self, punch_datetime: datetime.datetime):
"""
:param datetime.datetime punch_datetime: datetime object
:raises: on status code != 2xx
"""
self.mobatime_api.save_entry(punch_datetime, PunchCodes.BREAK_END.value, note="pause ende").raise_for_status()
def smart_punch(self, punch_datetime: datetime.datetime) -> None:
"""
Perform smart punch for today.
This tries to detect your last action and will create entries in to following order:
punch_in -> break_start -> break_end -> punch_out
:param datetime.datetime punch_datetime: a datetime object to identify the current day aka. today
"""
last_punch = self.mobatime_api.get_entries(
1,
start_date=punch_datetime.replace(hour=0, minute=0, second=0, microsecond=0),
end_date=punch_datetime.replace(hour=0, minute=0, second=0, microsecond=0),
)
method = None
if not last_punch or last_punch[0]["entryNumber"] is None:
self.logger.debug("could not detect any time entry for today... punching in")
method = "punch_in"
elif last_punch[0]["entryNumber"] == PunchCodes.COMING.value:
self.logger.debug("your last entry was `punch_in`... starting break")
method = "break_start"
elif last_punch[0]["entryNumber"] == PunchCodes.BREAK_START.value:
self.logger.debug("your last entry was `break_start`... ending break")
method = "break_end"
elif last_punch[0]["entryNumber"] == PunchCodes.BREAK_END.value:
self.logger.debug("your last entry was `break_end`... punching out")
method = "punch_out"
elif last_punch[0]["entryNumber"] == PunchCodes.LEAVING.value:
self.logger.error("your last entry was `punch_out`... punching in again with this command is not supported")
sys.exit(1)
if method is None:
self.logger.error("hit an unknown situation... detection failed; run with `-v` for more info")
self.logger.debug(f"last entry was: {last_punch}")
exit(1)
self.logger.info("running `{}` with date `{}` and time `{}`".format(
method,
punch_datetime.strftime(DateFormats.SIMPLE_DATE.value),
punch_datetime.strftime(DateFormats.SIMPLE_TIME.value),
))
getattr(self, method)(punch_datetime)
def status(self) -> str:
"""
Get and format Mobatime tracking data and and account information.
The output ist formatted to print it to the console.
:return: preformatted status text
"""
tracking_data = self.mobatime_api.get_tracking_data()
account_info = self.mobatime_api.get_account_information()
ret = "Tracking Info:\n"
ret += f" Current State: {tracking_data['actualState']}\n"
ret += f" Last Booking: {tracking_data['lastBooking']}\n"
ret += f" Last Booking Date: {tracking_data['lastBookingDate']}\n"
ret += "Account Infos:\n"
for i in account_info:
ret += f" {i['accountName']}: {i['value']}\n"
return ret
def get_hours_present(self) -> datetime.timedelta:
"""
Calculate today's hours present.
:return: timedelta object with today's hours present
"""
now = datetime.datetime.now()
punches = self.mobatime_api.get_entries(
None,
start_date=now.replace(hour=0, minute=0, second=0, microsecond=0),
end_date=now.replace(hour=0, minute=0, second=0, microsecond=0),
)
punch_out_time = now
if punches and punches[0]["entryNumber"] == PunchCodes.LEAVING.value:
punch_out_time = datetime.datetime.strptime(punches.pop(0)["dateTime"], DateFormats.FULL_DATETIME.value)
punch_in_time = None
if punches and punches[-1]["entryNumber"] == PunchCodes.COMING.value:
punch_in_time = datetime.datetime.strptime(punches.pop(-1)["dateTime"], DateFormats.FULL_DATETIME.value)
if punch_in_time is None:
return datetime.timedelta(seconds=0)
breaks = []
end = now
for i in punches:
if i["entryNumber"] == PunchCodes.BREAK_END.value:
end = datetime.datetime.strptime(i["dateTime"], DateFormats.FULL_DATETIME.value)
if i["entryNumber"] == PunchCodes.BREAK_START.value:
start = datetime.datetime.strptime(i["dateTime"], DateFormats.FULL_DATETIME.value)
breaks.append(end - start)
end = now
time_delta = punch_out_time - punch_in_time
for i in breaks:
time_delta = time_delta - i
return time_delta