mobatime cmd line util
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
timebot/timebot/timebot.py

295 lines
11 KiB

import datetime
import getpass
import logging
import pickle
import sys
import requests as requests
from timebot.constants import COMING_ENTRY_CODE_ID, LEAVING_ENTRY_CODE_ID, BREAK_START_ENTRY_CODE_ID, \
BREAK_END_ENTRY_CODE_ID, SIMPLE_DATE_FORMAT, SIMPLE_TIME_FORMAT, SIMPLE_DATETIME_FORMAT, \
USER_AGENT
class MobatimeApi:
def __init__(self, baseurl: str, user: str, password: str = None, ask_for_password: bool = False,
save_session: bool = False, cookie_store: str = ".kekse"):
self.logger = logging.getLogger(self.__class__.__name__)
self.baseurl = self._sanitize_baseurl(baseurl)
self.user = user
self.password = password
self._ask_for_password = ask_for_password
self._save_session = save_session
self._session = None
self._cookie_store = cookie_store
self._current_user = None
@staticmethod
def _sanitize_baseurl(baseurl: str):
if baseurl.endswith("/"):
return baseurl
return baseurl + "/"
@property
def session(self):
"""
Return requests session object and auto login if necessary.
:return: requests session
:raises on any http error
"""
if self._session is None:
self._session = requests.Session()
self._session.headers.update({"User-Agent": USER_AGENT})
try:
self._load_session_cookies(self._session)
except FileNotFoundError as e:
self.logger.warning(e) # file does not exist... ignored
except (EOFError, pickle.UnpicklingError) as e:
self.logger.warning(e) # file seems to be corrupt... ignoring
request = self._session.get(self.baseurl + "Employee/GetEmployeeList")
if 400 <= request.status_code < 500:
self.logger.debug(f"got error {request.status_code}... trying to log in")
self._login(self._session)
self._save_session_cookies(self._session)
else:
request.raise_for_status()
return self._session
def _login(self, session: requests.Session):
"""
Clear existing session cookies and try new login.
:raises: on status code != 2xx
"""
if self.password is None and self._ask_for_password:
self.password = self._get_password()
else:
raise Exception("could not obtain password")
login_data = {
"username": self.user,
"password": self.password,
}
session.cookies.clear_session_cookies()
request = session.post(self.baseurl + "Account/LogOn", data=login_data)
request.raise_for_status()
@staticmethod
def _get_password():
"""
Ask the user for his password.
:return: the users password
"""
return getpass.getpass("Enter your password: ")
def _save_session_cookies(self, session: requests.Session):
"""
Save the session cookies as pickle file.
:param requests.Session session: the requests session to extract the cookies from
"""
if self._save_session:
with open(self._cookie_store, "wb") as f:
self.logger.debug("pickling session cookies")
pickle.dump(requests.utils.dict_from_cookiejar(session.cookies), f)
def _load_session_cookies(self, session: requests.Session):
"""
Load the session cookies from the pickle file and updates the given session.
:param requests.Session session: the requests session which will be updated with the loaded cookies
"""
if self._save_session:
with open(self._cookie_store, "rb") as f:
self.logger.debug("loading pickled cookies")
session.cookies.update(requests.utils.cookiejar_from_dict(pickle.load(f)))
@property
def current_user(self):
"""
Returns all user information for the current user.
:return: all user information as dict
"""
if self._current_user is None:
self._current_user = self.get_user_profile_for_current_user()
return self._current_user
def get_current_user_id(self):
"""
Get the current users id.
:return: user id
"""
return self.current_user["employee"]["id"]
def get_employees(self):
"""
List all employees which are obviously in the same team as you.
:return: list of employees
"""
request = self.session.get(self.baseurl + "Employee/GetEmployees")
request.raise_for_status()
return request.json()
def get_user_profile_for_current_user(self):
"""
Returns all user information for the current user.
:return: all user information as dict
"""
request = self.session.get(self.baseurl + "Employee/GetUserProfileForCurrentUser")
request.raise_for_status()
return request.json()
def save_entry(self, punch_datetime: datetime.datetime, entry_code: int, note: str = None) -> requests.Response:
"""
Add mobatime entry.
:param datetime.datetime punch_datetime: datetim object
:param int entry_code: entry type code
:param str note: free text note added to the Mobatime entry
:return: requests response object
"""
punch_date = punch_datetime.strftime(SIMPLE_DATE_FORMAT)
punch_time = punch_datetime.strftime(SIMPLE_TIME_FORMAT)
entry_data = {
"periode0Date": punch_date,
"periode0Time": punch_time,
"selectedEntryCode": entry_code,
"selectedPeriodType": 0,
"employeeId": self.get_current_user_id(),
}
if note:
entry_data["note"] = note
request = self.session.post(self.baseurl + "Entry/SaveEntry", data=entry_data)
return request
def get_entries(self, entries: int = 10,
start_date: datetime.datetime = None,
end_date: datetime.datetime = None) -> list:
"""
List time entries.
:param int entries: number of entries to return
:param datetime.datetime start_date: start date of request; by default ``end_date`` - 10 days
:param datetime.datetime end_date: end date of request; usually the more recent date
:return:
"""
if not end_date:
end_date = datetime.datetime.now()
if not start_date:
start_date = end_date - datetime.timedelta(days=10)
filters = {"take": entries, "skip": 0, "page": 1, "pageSize": entries,
"sort": [{"field": "dateTime", "dir": "desc"}],
"filter": {"logic": "and",
"filters": [
{"field": "employeeId", "operator": "eq", "value": self.get_current_user_id()},
{"field": "errorText", "operator": "eq", "value": "all"}]}}
filters["filter"]["filters"].append({
"field": "startDate",
"operator": "gte",
"value": start_date.strftime(SIMPLE_DATETIME_FORMAT),
})
filters["filter"]["filters"].append({
"field": "endDate",
"operator": "lte",
"value": end_date.strftime(SIMPLE_DATETIME_FORMAT),
})
request = self.session.post(self.baseurl + "Entry/GetEntries", json=filters)
request.raise_for_status()
return request.json()["data"]
def get_tracking_data(self) -> dict:
"""
Get the current tracking state.
This contains:
* current presence state
* last booking
* last booking date
* available quick actions
:return: dict of the info mentioned above
"""
request = self.session.get(self.baseurl + "Tracking/GetTrackingData")
request.raise_for_status()
return request.json()
def get_account_information(self) -> list:
"""
Get the current account information.
* remaining vacation days
* time account balance
:return: list with mentioned infos
"""
request = self.session.get(self.baseurl + "Employee/GetAccountInformation")
request.raise_for_status()
return request.json()
class TimeBot:
def __init__(self, mobatime_api: MobatimeApi):
self.logger = logging.getLogger(self.__class__.__name__)
self.mobatime_api = mobatime_api
def punch_in(self, punch_datetime: datetime.datetime):
"""
:param datetime.datetime punch_datetime: datetime object
:raises: on status code != 2xx
"""
self.mobatime_api.save_entry(punch_datetime, COMING_ENTRY_CODE_ID, note="da").raise_for_status()
def punch_out(self, punch_datetime: datetime.datetime):
"""
:param datetime.datetime punch_datetime: datetime object
:raises: on status code != 2xx
"""
self.mobatime_api.save_entry(punch_datetime, LEAVING_ENTRY_CODE_ID, note="weg").raise_for_status()
def break_start(self, punch_datetime: datetime.datetime):
"""
:param datetime.datetime punch_datetime: datetime object
:raises: on status code != 2xx
"""
self.mobatime_api.save_entry(punch_datetime, BREAK_START_ENTRY_CODE_ID, note="pause").raise_for_status()
def break_end(self, punch_datetime: datetime.datetime):
"""
:param datetime.datetime punch_datetime: datetime object
:raises: on status code != 2xx
"""
self.mobatime_api.save_entry(punch_datetime, BREAK_END_ENTRY_CODE_ID, note="pause ende").raise_for_status()
def smart_punch(self, punch_datetime):
last_punch = self.mobatime_api.get_entries(1, punch_datetime.replace(hour=0, minute=0, second=0, microsecond=0))
method = None
if not last_punch or last_punch[0]["entryNumber"] is None:
self.logger.debug("could not detect any time entry for today... punching in")
method = "punch_in"
elif last_punch[0]["entryNumber"] == COMING_ENTRY_CODE_ID:
self.logger.debug("your last entry was `punch_in`... starting break")
method = "break_start"
elif last_punch[0]["entryNumber"] == BREAK_START_ENTRY_CODE_ID:
self.logger.debug("your last entry was `break_start`... ending break")
method = "break_end"
elif last_punch[0]["entryNumber"] == BREAK_END_ENTRY_CODE_ID:
self.logger.debug("your last entry was `break_end`... punching out")
method = "punch_out"
elif last_punch[0]["entryNumber"] == LEAVING_ENTRY_CODE_ID:
self.logger.error("your last entry was `punch_out`... punching in again with this command is not supported")
sys.exit(1)
if method is None:
self.logger.error("hit an unknown situation... detection failed; run with `-v` for more info")
self.logger.debug(f"last entry was: {last_punch}")
exit(1)
self.logger.info("running `{}` with date `{}` and time `{}`".format(
method,
punch_datetime.strftime(SIMPLE_DATE_FORMAT),
punch_datetime.strftime(SIMPLE_TIME_FORMAT),
))
getattr(self, method)(punch_datetime)