You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
382 lines
15 KiB
382 lines
15 KiB
import argparse
|
|
import configparser
|
|
import datetime
|
|
import getpass
|
|
import logging
|
|
import pickle
|
|
import sys
|
|
|
|
import requests as requests
|
|
|
|
from constants import COMING_ENTRY_CODE_ID, LEAVING_ENTRY_CODE_ID, PUNCH_COMMANDS, BREAK_START_ENTRY_CODE_ID, \
|
|
BREAK_END_ENTRY_CODE_ID, SIMPLE_DATE_FORMAT, SIMPLE_TIME_FORMAT, SIMPLE_DATETIME_FORMAT, \
|
|
SIMPLE_DATETIME_FORMAT_HUMAN, USER_AGENT
|
|
|
|
logger = logging.getLogger()
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
class TimeBot:
|
|
def __init__(self, baseurl: str, user: str, password: str = None, ask_for_password: bool = False,
|
|
save_session: bool = False):
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
self.baseurl = self._sanitize_baseurl(baseurl)
|
|
self.user = user
|
|
self.password = password
|
|
self._ask_for_password = ask_for_password
|
|
self._save_session = save_session
|
|
self._session = None
|
|
self._current_user = None
|
|
|
|
@staticmethod
|
|
def _sanitize_baseurl(baseurl: str):
|
|
if baseurl.endswith("/"):
|
|
return baseurl
|
|
return baseurl + "/"
|
|
|
|
@property
|
|
def session(self):
|
|
"""
|
|
Return requests session object and auto login if necessary.
|
|
|
|
:return: requests session
|
|
:raises on any http error
|
|
"""
|
|
if self._session is None:
|
|
self._session = requests.Session()
|
|
self._session.headers.update({"User-Agent": USER_AGENT})
|
|
try:
|
|
self._load_session_cookies(self._session)
|
|
except FileNotFoundError as e:
|
|
logger.warning(e) # file does not exist... ignored
|
|
except (EOFError, pickle.UnpicklingError) as e:
|
|
logger.warning(e) # file seems to be corrupt... ignoring
|
|
request = self._session.get(self.baseurl + "Employee/GetEmployeeList")
|
|
if 400 <= request.status_code < 500:
|
|
self.logger.debug(f"got error {request.status_code}... trying to log in")
|
|
self._login(self._session)
|
|
self._save_session_cookies(self._session)
|
|
else:
|
|
request.raise_for_status()
|
|
return self._session
|
|
|
|
def _login(self, session: requests.Session):
|
|
"""
|
|
Clear existing session cookies and try new login.
|
|
|
|
:raises: on status code != 2xx
|
|
"""
|
|
if self.password is None and self._ask_for_password:
|
|
self.password = self._get_password()
|
|
else:
|
|
raise Exception("could not obtain password")
|
|
login_data = {
|
|
"username": self.user,
|
|
"password": self.password,
|
|
}
|
|
session.cookies.clear_session_cookies()
|
|
request = session.post(self.baseurl + "Account/LogOn", data=login_data)
|
|
request.raise_for_status()
|
|
|
|
@staticmethod
|
|
def _get_password():
|
|
"""
|
|
Ask the user for his password.
|
|
|
|
:return: the users password
|
|
"""
|
|
return getpass.getpass("Enter your password: ")
|
|
|
|
def _save_session_cookies(self, session: requests.Session):
|
|
"""
|
|
Save the session cookies as pickle file.
|
|
|
|
:param requests.Session session: the requests session to extract the cookies from
|
|
"""
|
|
if self._save_session:
|
|
with open(".kekse", "wb") as f:
|
|
self.logger.debug("pickling session cookies")
|
|
pickle.dump(requests.utils.dict_from_cookiejar(session.cookies), f)
|
|
|
|
def _load_session_cookies(self, session: requests.Session):
|
|
"""
|
|
Load the session cookies from the pickle file and updates the given session.
|
|
|
|
:param requests.Session session: the requests session which will be updated with the loaded cookies
|
|
"""
|
|
if self._save_session:
|
|
with open(".kekse", "rb") as f:
|
|
self.logger.debug("loading pickled cookies")
|
|
session.cookies.update(requests.utils.cookiejar_from_dict(pickle.load(f)))
|
|
|
|
def add_entry(self, punch_datetime: datetime.datetime, entry_code: int, note: str = None) -> requests.Response:
|
|
"""
|
|
Add mobatime entry.
|
|
|
|
:param datetime.datetime punch_datetime: datetim object
|
|
:param int entry_code: entry type code
|
|
:param str note: free text note added to the Mobatime entry
|
|
:return: requests response object
|
|
"""
|
|
punch_date = punch_datetime.strftime(SIMPLE_DATE_FORMAT)
|
|
punch_time = punch_datetime.strftime(SIMPLE_TIME_FORMAT)
|
|
entry_data = {
|
|
"periode0Date": punch_date,
|
|
"periode0Time": punch_time,
|
|
"selectedEntryCode": entry_code,
|
|
"selectedPeriodType": 0,
|
|
"employeeId": self.get_current_user_id(),
|
|
}
|
|
if note:
|
|
entry_data["note"] = note
|
|
request = self.session.post(self.baseurl + "Entry/SaveEntry", data=entry_data)
|
|
return request
|
|
|
|
def list_employees(self):
|
|
"""
|
|
List all employees which are obviously in the same team as you.
|
|
:return: list of employees
|
|
"""
|
|
request = self.session.get(self.baseurl + "Employee/GetEmployees")
|
|
request.raise_for_status()
|
|
return request.json()
|
|
|
|
def _get_user_profile_for_current_user(self):
|
|
"""
|
|
Returns all user information for the current user.
|
|
|
|
:return: all user information as dict
|
|
"""
|
|
request = self.session.get(self.baseurl + "Employee/GetUserProfileForCurrentUser")
|
|
request.raise_for_status()
|
|
return request.json()
|
|
|
|
@property
|
|
def current_user(self):
|
|
"""
|
|
Returns all user information for the current user.
|
|
|
|
:return: all user information as dict
|
|
"""
|
|
if self._current_user is None:
|
|
self._current_user = self._get_user_profile_for_current_user()
|
|
return self._current_user
|
|
|
|
def get_current_user_id(self):
|
|
"""
|
|
Get the current users id.
|
|
|
|
:return: user id
|
|
"""
|
|
return self.current_user["employee"]["id"]
|
|
|
|
def get_entries(self, entries: int = 10,
|
|
start_date: datetime.datetime = None,
|
|
end_date: datetime.datetime = None) -> list:
|
|
"""
|
|
List time entries.
|
|
|
|
:param int entries: number of entries to return
|
|
:param datetime.datetime start_date: start date of request; by default ``end_date`` - 10 days
|
|
:param datetime.datetime end_date: end date of request; usually the more recent date
|
|
:return:
|
|
"""
|
|
if not end_date:
|
|
end_date = datetime.datetime.now()
|
|
if not start_date:
|
|
start_date = end_date - datetime.timedelta(days=10)
|
|
filters = {"take": entries, "skip": 0, "page": 1, "pageSize": entries,
|
|
"sort": [{"field": "dateTime", "dir": "desc"}],
|
|
"filter": {"logic": "and",
|
|
"filters": [
|
|
{"field": "employeeId", "operator": "eq", "value": self.get_current_user_id()},
|
|
{"field": "errorText", "operator": "eq", "value": "all"}]}}
|
|
filters["filter"]["filters"].append({
|
|
"field": "startDate",
|
|
"operator": "gte",
|
|
"value": start_date.strftime(SIMPLE_DATETIME_FORMAT),
|
|
})
|
|
filters["filter"]["filters"].append({
|
|
"field": "endDate",
|
|
"operator": "lte",
|
|
"value": end_date.strftime(SIMPLE_DATETIME_FORMAT),
|
|
})
|
|
request = self.session.post(self.baseurl + "Entry/GetEntries", json=filters)
|
|
request.raise_for_status()
|
|
return request.json()["data"]
|
|
|
|
def get_tracking_data(self) -> dict:
|
|
"""
|
|
Get the current tracking state.
|
|
|
|
This contains:
|
|
|
|
* current presence state
|
|
* last booking
|
|
* last booking date
|
|
* available quick actions
|
|
|
|
:return: dict of the info mentioned above
|
|
"""
|
|
request = self.session.get(self.baseurl + "Tracking/GetTrackingData")
|
|
request.raise_for_status()
|
|
return request.json()
|
|
|
|
def get_account_information(self) -> list:
|
|
"""
|
|
Get the current account information.
|
|
|
|
* remaining vacation days
|
|
* time account balance
|
|
|
|
:return: list with mentioned infos
|
|
"""
|
|
request = self.session.get(self.baseurl + "Employee/GetAccountInformation")
|
|
request.raise_for_status()
|
|
return request.json()
|
|
|
|
def punch_in(self, punch_datetime: datetime.datetime):
|
|
"""
|
|
:param datetime.datetime punch_datetime: datetime object
|
|
:raises: on status code != 2xx
|
|
"""
|
|
self.add_entry(punch_datetime, COMING_ENTRY_CODE_ID, note="da").raise_for_status()
|
|
|
|
def punch_out(self, punch_datetime: datetime.datetime):
|
|
"""
|
|
:param datetime.datetime punch_datetime: datetime object
|
|
:raises: on status code != 2xx
|
|
"""
|
|
self.add_entry(punch_datetime, LEAVING_ENTRY_CODE_ID, note="weg").raise_for_status()
|
|
|
|
def break_start(self, punch_datetime: datetime.datetime):
|
|
"""
|
|
:param datetime.datetime punch_datetime: datetime object
|
|
:raises: on status code != 2xx
|
|
"""
|
|
self.add_entry(punch_datetime, BREAK_START_ENTRY_CODE_ID, note="pause").raise_for_status()
|
|
|
|
def break_end(self, punch_datetime: datetime.datetime):
|
|
"""
|
|
:param datetime.datetime punch_datetime: datetime object
|
|
:raises: on status code != 2xx
|
|
"""
|
|
self.add_entry(punch_datetime, BREAK_END_ENTRY_CODE_ID, note="pause ende").raise_for_status()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-v", help="enable debug logging", action="store_true")
|
|
parser.add_argument("-u", help="mobatime login user", required=True)
|
|
parser.add_argument("-p", help="mobatime login user password", default=None)
|
|
parser.add_argument("-c", help="config file", default="timebot.ini")
|
|
parser.add_argument("--save-cookies", help="save auth cookies to `./.kekse`", action="store_true", default=False)
|
|
subparsers = parser.add_subparsers(help='sub-command help', dest='subparser_name')
|
|
|
|
# subparser command: `status`
|
|
parser_status = subparsers.add_parser("status", help="show your current tracking status")
|
|
|
|
# subparser command: `list-entries`
|
|
parser_list_entries = subparsers.add_parser("list-entries", help="use this command to list your time entries")
|
|
parser_list_entries.add_argument("--start-date",
|
|
help=f"start date filter in format `{SIMPLE_DATETIME_FORMAT_HUMAN}` "
|
|
f"(default: now - 10days; unset for default)")
|
|
parser_list_entries.add_argument("--end-date",
|
|
help=f"end date filter in format `{SIMPLE_DATETIME_FORMAT_HUMAN}` "
|
|
f"(default: now; unset for default)")
|
|
parser_list_entries.add_argument("--items", help="max items to request per page", default=20, type=int)
|
|
|
|
# subparser command: `punch`
|
|
parser_punch = subparsers.add_parser("punch",
|
|
help="use this command to punch in, punch out, or create break entries")
|
|
parser_punch.add_argument("-t",
|
|
help=f"type of time entry; this can be {', '.join(PUNCH_COMMANDS)}",
|
|
default="punch_in",
|
|
choices=PUNCH_COMMANDS)
|
|
parser_punch.add_argument("-s", help=f"timestamp in format `{SIMPLE_DATETIME_FORMAT_HUMAN}` or `now`",
|
|
default="now")
|
|
|
|
# subparser command: `smart-punch`
|
|
parser_smart_punch = subparsers.add_parser("smart-punch",
|
|
help="use this command to auto punch in, punch out and create break "
|
|
"entries; this command tries to detect your last action and will "
|
|
"create entries in to following order: "
|
|
"punch_in(now) -> break_start(now) -> break_end(now) "
|
|
"-> punch_out(now)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.v:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read(args.c)
|
|
|
|
tb = TimeBot(baseurl=config["general"]["baseurl"], user=args.u, password=args.p, ask_for_password=True,
|
|
save_session=args.save_cookies)
|
|
if args.subparser_name == "punch":
|
|
if args.s == "now":
|
|
punch_datetime = datetime.datetime.now()
|
|
else:
|
|
punch_datetime = datetime.datetime.strptime(args.s, SIMPLE_DATETIME_FORMAT)
|
|
logger.info("running `{}` with date `{}` and time `{}`".format(
|
|
args.t,
|
|
punch_datetime.strftime(SIMPLE_DATE_FORMAT),
|
|
punch_datetime.strftime(SIMPLE_TIME_FORMAT),
|
|
))
|
|
getattr(tb, args.t)(punch_datetime)
|
|
elif args.subparser_name == "smart-punch":
|
|
now = datetime.datetime.now()
|
|
last_punch = tb.get_entries(1, now.replace(hour=0, minute=0, second=0, microsecond=0))
|
|
method = None
|
|
if not last_punch:
|
|
logger.debug("could not detect any time entry for today... punching in")
|
|
method = "punch_in"
|
|
elif last_punch[0]["entryNumber"] == COMING_ENTRY_CODE_ID:
|
|
logger.debug("your last entry was `punch_in`... starting break")
|
|
method = "break_start"
|
|
elif last_punch[0]["entryNumber"] == BREAK_START_ENTRY_CODE_ID:
|
|
logger.debug("your last entry was `break_start`... ending break")
|
|
method = "break_end"
|
|
elif last_punch[0]["entryNumber"] == BREAK_END_ENTRY_CODE_ID:
|
|
logger.debug("your last entry was `break_end`... punching out")
|
|
method = "punch_out"
|
|
elif last_punch[0]["entryNumber"] == LEAVING_ENTRY_CODE_ID:
|
|
logger.error("your last entry was `punch_out`... punching in again with this command is not supported")
|
|
sys.exit(1)
|
|
if method is None:
|
|
logger.error("hit an unknown situation... detection failed; run with `-v` for more info")
|
|
logger.debug(f"last entry was: {last_punch}")
|
|
exit(1)
|
|
logger.info("running `{}` with date `{}` and time `{}`".format(
|
|
method,
|
|
now.strftime(SIMPLE_DATE_FORMAT),
|
|
now.strftime(SIMPLE_TIME_FORMAT),
|
|
))
|
|
getattr(tb, method)(now)
|
|
elif args.subparser_name == "list-entries":
|
|
end_date = None
|
|
if args.end_date:
|
|
end_date = datetime.datetime.strptime(args.end_date, SIMPLE_DATETIME_FORMAT)
|
|
start_date = None
|
|
if args.start_date:
|
|
start_date = datetime.datetime.strptime(args.start_date, SIMPLE_DATETIME_FORMAT)
|
|
data = tb.get_entries(entries=args.items, start_date=start_date, end_date=end_date)
|
|
data.reverse()
|
|
for i in data:
|
|
print("Entry: {} - DateTime: {} - Note: {}".format(i["entryName"], i["dateTime"], i["note"]))
|
|
elif args.subparser_name == "status":
|
|
tracking_data = tb.get_tracking_data()
|
|
account_info = tb.get_account_information()
|
|
print("Tracking Info:")
|
|
print(f" Current State: {tracking_data['actualState']}")
|
|
print(f" Last Booking: {tracking_data['lastBooking']}")
|
|
print(f" Last Booking Date: {tracking_data['lastBookingDate']}")
|
|
print("Account Infos:")
|
|
for i in account_info:
|
|
print(f" {i['accountName']}: {i['value']}")
|
|
else:
|
|
logger.error("Noting done... dunno what you want!")
|
|
sys.exit(1)
|
|
|
|
sys.exit(0)
|
|
|