|
|
|
@ -3,17 +3,19 @@ import getpass |
|
|
|
|
import logging |
|
|
|
|
import pickle |
|
|
|
|
import sys |
|
|
|
|
from typing import Union |
|
|
|
|
|
|
|
|
|
import requests as requests |
|
|
|
|
|
|
|
|
|
from timebot.constants import COMING_ENTRY_CODE_ID, LEAVING_ENTRY_CODE_ID, BREAK_START_ENTRY_CODE_ID, \ |
|
|
|
|
BREAK_END_ENTRY_CODE_ID, SIMPLE_DATE_FORMAT, SIMPLE_TIME_FORMAT, SIMPLE_DATETIME_FORMAT, \ |
|
|
|
|
USER_AGENT |
|
|
|
|
|
|
|
|
|
from timebot.constants import PunchCodes, DateFormats, USER_AGENT |
|
|
|
|
|
|
|
|
|
package_logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TimebotObtainPasswordError(Exception): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TimeParseError(Exception): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
@ -21,20 +23,21 @@ class TimeParseError(Exception): |
|
|
|
|
def parse_user_time_input(stamp: str) -> datetime.datetime: |
|
|
|
|
errors = [] |
|
|
|
|
try: |
|
|
|
|
package_logger.debug(f"trying to parse date with format \"{SIMPLE_DATETIME_FORMAT}\"") |
|
|
|
|
return datetime.datetime.strptime(stamp, SIMPLE_DATETIME_FORMAT) |
|
|
|
|
package_logger.debug(f"trying to parse date with format \"{DateFormats.SIMPLE_DATETIME.value}\"") |
|
|
|
|
return datetime.datetime.strptime(stamp, DateFormats.SIMPLE_DATETIME.value) |
|
|
|
|
except ValueError as e: |
|
|
|
|
errors.append(e) |
|
|
|
|
try: |
|
|
|
|
package_logger.debug(f"trying to parse date with format \"{SIMPLE_TIME_FORMAT}\"") |
|
|
|
|
t = datetime.datetime.strptime(stamp, SIMPLE_TIME_FORMAT) |
|
|
|
|
package_logger.debug(f"trying to parse date with format \"{DateFormats.SIMPLE_TIME}\"") |
|
|
|
|
t = datetime.datetime.strptime(stamp, DateFormats.SIMPLE_TIME.value) |
|
|
|
|
dt = datetime.datetime.now() |
|
|
|
|
dt = dt.replace(hour=0, minute=0, second=0, microsecond=0) |
|
|
|
|
dt += datetime.timedelta(hours=t.hour, minutes=t.minute) |
|
|
|
|
return dt |
|
|
|
|
except ValueError as e: |
|
|
|
|
errors.append(e) |
|
|
|
|
package_logger.error(f"could not parse date with format \"{SIMPLE_DATETIME_FORMAT}\" or \"{SIMPLE_TIME_FORMAT}\"") |
|
|
|
|
package_logger.error(f"could not parse date with format \"{DateFormats.SIMPLE_DATETIME.value}\" or" |
|
|
|
|
f" \"{DateFormats.SIMPLE_TIME.value}\"") |
|
|
|
|
raise TimeParseError(errors) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -45,7 +48,7 @@ class MobatimeApi: |
|
|
|
|
self.baseurl = self._sanitize_baseurl(baseurl) |
|
|
|
|
self.user = user |
|
|
|
|
self.password = password |
|
|
|
|
self._ask_for_password = ask_for_password |
|
|
|
|
self.ask_for_password = ask_for_password |
|
|
|
|
self._save_session = save_session |
|
|
|
|
self._session = None |
|
|
|
|
self._cookie_store = cookie_store |
|
|
|
@ -74,7 +77,7 @@ class MobatimeApi: |
|
|
|
|
self.logger.warning(e) # file does not exist... ignored |
|
|
|
|
except (EOFError, pickle.UnpicklingError) as e: |
|
|
|
|
self.logger.warning(e) # file seems to be corrupt... ignoring |
|
|
|
|
request = self._session.get(self.baseurl + "Employee/GetEmployeeList") |
|
|
|
|
request = self._session.get(self.baseurl + "Employee/GetEmployeeList", timeout=2) |
|
|
|
|
if 400 <= request.status_code < 500: |
|
|
|
|
self.logger.debug(f"got error {request.status_code}... trying to log in") |
|
|
|
|
self._login(self._session) |
|
|
|
@ -89,17 +92,21 @@ class MobatimeApi: |
|
|
|
|
|
|
|
|
|
:raises: on status code != 2xx |
|
|
|
|
""" |
|
|
|
|
if self.password is None and self._ask_for_password: |
|
|
|
|
if self.password is None and self.ask_for_password: |
|
|
|
|
self.password = self._get_password() |
|
|
|
|
elif self.password and not self.ask_for_password: |
|
|
|
|
pass |
|
|
|
|
else: |
|
|
|
|
raise Exception("could not obtain password") |
|
|
|
|
raise TimebotObtainPasswordError("Unauthorized: could not obtain password") |
|
|
|
|
login_data = { |
|
|
|
|
"username": self.user, |
|
|
|
|
"password": self.password, |
|
|
|
|
} |
|
|
|
|
session.cookies.clear_session_cookies() |
|
|
|
|
request = session.post(self.baseurl + "Account/LogOn", data=login_data) |
|
|
|
|
request.raise_for_status() |
|
|
|
|
session.post(self.baseurl + "Account/LogOn", |
|
|
|
|
data=login_data, |
|
|
|
|
timeout=2).raise_for_status() # This always gives 200 ... even with wrong password |
|
|
|
|
session.get(self.baseurl + "Employee/GetEmployeeList", timeout=2).raise_for_status() |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def _get_password(): |
|
|
|
@ -156,7 +163,7 @@ class MobatimeApi: |
|
|
|
|
List all employees which are obviously in the same team as you. |
|
|
|
|
:return: list of employees |
|
|
|
|
""" |
|
|
|
|
request = self.session.get(self.baseurl + "Employee/GetEmployees") |
|
|
|
|
request = self.session.get(self.baseurl + "Employee/GetEmployees", timeout=2) |
|
|
|
|
request.raise_for_status() |
|
|
|
|
return request.json() |
|
|
|
|
|
|
|
|
@ -166,7 +173,7 @@ class MobatimeApi: |
|
|
|
|
|
|
|
|
|
:return: all user information as dict |
|
|
|
|
""" |
|
|
|
|
request = self.session.get(self.baseurl + "Employee/GetUserProfileForCurrentUser") |
|
|
|
|
request = self.session.get(self.baseurl + "Employee/GetUserProfileForCurrentUser", timeout=2) |
|
|
|
|
request.raise_for_status() |
|
|
|
|
return request.json() |
|
|
|
|
|
|
|
|
@ -179,8 +186,8 @@ class MobatimeApi: |
|
|
|
|
:param str note: free text note added to the Mobatime entry |
|
|
|
|
:return: requests response object |
|
|
|
|
""" |
|
|
|
|
punch_date = punch_datetime.strftime(SIMPLE_DATE_FORMAT) |
|
|
|
|
punch_time = punch_datetime.strftime(SIMPLE_TIME_FORMAT) |
|
|
|
|
punch_date = punch_datetime.strftime(DateFormats.SIMPLE_DATE.value) |
|
|
|
|
punch_time = punch_datetime.strftime(DateFormats.SIMPLE_TIME.value) |
|
|
|
|
entry_data = { |
|
|
|
|
"periode0Date": punch_date, |
|
|
|
|
"periode0Time": punch_time, |
|
|
|
@ -190,10 +197,10 @@ class MobatimeApi: |
|
|
|
|
} |
|
|
|
|
if note: |
|
|
|
|
entry_data["note"] = note |
|
|
|
|
request = self.session.post(self.baseurl + "Entry/SaveEntry", data=entry_data) |
|
|
|
|
request = self.session.post(self.baseurl + "Entry/SaveEntry", data=entry_data, timeout=2) |
|
|
|
|
return request |
|
|
|
|
|
|
|
|
|
def get_entries(self, entries: int = 10, |
|
|
|
|
def get_entries(self, entries: Union[str, None] = 10, |
|
|
|
|
start_date: datetime.datetime = None, |
|
|
|
|
end_date: datetime.datetime = None) -> list: |
|
|
|
|
""" |
|
|
|
@ -217,14 +224,14 @@ class MobatimeApi: |
|
|
|
|
filters["filter"]["filters"].append({ |
|
|
|
|
"field": "startDate", |
|
|
|
|
"operator": "gte", |
|
|
|
|
"value": start_date.strftime(SIMPLE_DATETIME_FORMAT), |
|
|
|
|
"value": start_date.strftime(DateFormats.SIMPLE_DATETIME.value), |
|
|
|
|
}) |
|
|
|
|
filters["filter"]["filters"].append({ |
|
|
|
|
"field": "endDate", |
|
|
|
|
"operator": "lte", |
|
|
|
|
"value": end_date.strftime(SIMPLE_DATETIME_FORMAT), |
|
|
|
|
"value": end_date.strftime(DateFormats.SIMPLE_DATETIME.value), |
|
|
|
|
}) |
|
|
|
|
request = self.session.post(self.baseurl + "Entry/GetEntries", json=filters) |
|
|
|
|
request = self.session.post(self.baseurl + "Entry/GetEntries", json=filters, timeout=2) |
|
|
|
|
request.raise_for_status() |
|
|
|
|
return request.json()["data"] |
|
|
|
|
|
|
|
|
@ -241,7 +248,7 @@ class MobatimeApi: |
|
|
|
|
|
|
|
|
|
:return: dict of the info mentioned above |
|
|
|
|
""" |
|
|
|
|
request = self.session.get(self.baseurl + "Tracking/GetTrackingData") |
|
|
|
|
request = self.session.get(self.baseurl + "Tracking/GetTrackingData", timeout=2) |
|
|
|
|
request.raise_for_status() |
|
|
|
|
return request.json() |
|
|
|
|
|
|
|
|
@ -254,7 +261,7 @@ class MobatimeApi: |
|
|
|
|
|
|
|
|
|
:return: list with mentioned infos |
|
|
|
|
""" |
|
|
|
|
request = self.session.get(self.baseurl + "Employee/GetAccountInformation") |
|
|
|
|
request = self.session.get(self.baseurl + "Employee/GetAccountInformation", timeout=2) |
|
|
|
|
request.raise_for_status() |
|
|
|
|
return request.json() |
|
|
|
|
|
|
|
|
@ -269,30 +276,38 @@ class TimeBot: |
|
|
|
|
:param datetime.datetime punch_datetime: datetime object |
|
|
|
|
:raises: on status code != 2xx |
|
|
|
|
""" |
|
|
|
|
self.mobatime_api.save_entry(punch_datetime, COMING_ENTRY_CODE_ID, note="da").raise_for_status() |
|
|
|
|
self.mobatime_api.save_entry(punch_datetime, PunchCodes.COMING.value, note="da").raise_for_status() |
|
|
|
|
|
|
|
|
|
def punch_out(self, punch_datetime: datetime.datetime): |
|
|
|
|
""" |
|
|
|
|
:param datetime.datetime punch_datetime: datetime object |
|
|
|
|
:raises: on status code != 2xx |
|
|
|
|
""" |
|
|
|
|
self.mobatime_api.save_entry(punch_datetime, LEAVING_ENTRY_CODE_ID, note="weg").raise_for_status() |
|
|
|
|
self.mobatime_api.save_entry(punch_datetime, PunchCodes.LEAVING.value, note="weg").raise_for_status() |
|
|
|
|
|
|
|
|
|
def break_start(self, punch_datetime: datetime.datetime): |
|
|
|
|
""" |
|
|
|
|
:param datetime.datetime punch_datetime: datetime object |
|
|
|
|
:raises: on status code != 2xx |
|
|
|
|
""" |
|
|
|
|
self.mobatime_api.save_entry(punch_datetime, BREAK_START_ENTRY_CODE_ID, note="pause").raise_for_status() |
|
|
|
|
self.mobatime_api.save_entry(punch_datetime, PunchCodes.BREAK_START.value, note="pause").raise_for_status() |
|
|
|
|
|
|
|
|
|
def break_end(self, punch_datetime: datetime.datetime): |
|
|
|
|
""" |
|
|
|
|
:param datetime.datetime punch_datetime: datetime object |
|
|
|
|
:raises: on status code != 2xx |
|
|
|
|
""" |
|
|
|
|
self.mobatime_api.save_entry(punch_datetime, BREAK_END_ENTRY_CODE_ID, note="pause ende").raise_for_status() |
|
|
|
|
self.mobatime_api.save_entry(punch_datetime, PunchCodes.BREAK_END.value, note="pause ende").raise_for_status() |
|
|
|
|
|
|
|
|
|
def smart_punch(self, punch_datetime: datetime.datetime) -> None: |
|
|
|
|
""" |
|
|
|
|
Perform smart punch for today. |
|
|
|
|
|
|
|
|
|
def smart_punch(self, punch_datetime): |
|
|
|
|
This tries to detect your last action and will create entries in to following order: |
|
|
|
|
punch_in -> break_start -> break_end -> punch_out |
|
|
|
|
|
|
|
|
|
:param datetime.datetime punch_datetime: a datetime object to identify the current day aka. today |
|
|
|
|
""" |
|
|
|
|
last_punch = self.mobatime_api.get_entries( |
|
|
|
|
1, |
|
|
|
|
start_date=punch_datetime.replace(hour=0, minute=0, second=0, microsecond=0), |
|
|
|
@ -302,16 +317,16 @@ class TimeBot: |
|
|
|
|
if not last_punch or last_punch[0]["entryNumber"] is None: |
|
|
|
|
self.logger.debug("could not detect any time entry for today... punching in") |
|
|
|
|
method = "punch_in" |
|
|
|
|
elif last_punch[0]["entryNumber"] == COMING_ENTRY_CODE_ID: |
|
|
|
|
elif last_punch[0]["entryNumber"] == PunchCodes.COMING.value: |
|
|
|
|
self.logger.debug("your last entry was `punch_in`... starting break") |
|
|
|
|
method = "break_start" |
|
|
|
|
elif last_punch[0]["entryNumber"] == BREAK_START_ENTRY_CODE_ID: |
|
|
|
|
elif last_punch[0]["entryNumber"] == PunchCodes.BREAK_START.value: |
|
|
|
|
self.logger.debug("your last entry was `break_start`... ending break") |
|
|
|
|
method = "break_end" |
|
|
|
|
elif last_punch[0]["entryNumber"] == BREAK_END_ENTRY_CODE_ID: |
|
|
|
|
elif last_punch[0]["entryNumber"] == PunchCodes.BREAK_END.value: |
|
|
|
|
self.logger.debug("your last entry was `break_end`... punching out") |
|
|
|
|
method = "punch_out" |
|
|
|
|
elif last_punch[0]["entryNumber"] == LEAVING_ENTRY_CODE_ID: |
|
|
|
|
elif last_punch[0]["entryNumber"] == PunchCodes.LEAVING.value: |
|
|
|
|
self.logger.error("your last entry was `punch_out`... punching in again with this command is not supported") |
|
|
|
|
sys.exit(1) |
|
|
|
|
if method is None: |
|
|
|
@ -320,7 +335,63 @@ class TimeBot: |
|
|
|
|
exit(1) |
|
|
|
|
self.logger.info("running `{}` with date `{}` and time `{}`".format( |
|
|
|
|
method, |
|
|
|
|
punch_datetime.strftime(SIMPLE_DATE_FORMAT), |
|
|
|
|
punch_datetime.strftime(SIMPLE_TIME_FORMAT), |
|
|
|
|
punch_datetime.strftime(DateFormats.SIMPLE_DATE.value), |
|
|
|
|
punch_datetime.strftime(DateFormats.SIMPLE_TIME.value), |
|
|
|
|
)) |
|
|
|
|
getattr(self, method)(punch_datetime) |
|
|
|
|
|
|
|
|
|
def status(self) -> str: |
|
|
|
|
""" |
|
|
|
|
Get and format Mobatime tracking data and and account information. |
|
|
|
|
The output ist formatted to print it to the console. |
|
|
|
|
|
|
|
|
|
:return: preformatted status text |
|
|
|
|
""" |
|
|
|
|
tracking_data = self.mobatime_api.get_tracking_data() |
|
|
|
|
account_info = self.mobatime_api.get_account_information() |
|
|
|
|
ret = "Tracking Info:\n" |
|
|
|
|
ret += f" Current State: {tracking_data['actualState']}\n" |
|
|
|
|
ret += f" Last Booking: {tracking_data['lastBooking']}\n" |
|
|
|
|
ret += f" Last Booking Date: {tracking_data['lastBookingDate']}\n" |
|
|
|
|
ret += "Account Infos:\n" |
|
|
|
|
for i in account_info: |
|
|
|
|
ret += f" {i['accountName']}: {i['value']}\n" |
|
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
def get_hours_present(self) -> datetime.timedelta: |
|
|
|
|
""" |
|
|
|
|
Calculate today's hours present. |
|
|
|
|
|
|
|
|
|
:return: timedelta object with today's hours present |
|
|
|
|
""" |
|
|
|
|
now = datetime.datetime.now() |
|
|
|
|
punches = self.mobatime_api.get_entries( |
|
|
|
|
None, |
|
|
|
|
start_date=now.replace(hour=0, minute=0, second=0, microsecond=0), |
|
|
|
|
end_date=now.replace(hour=0, minute=0, second=0, microsecond=0), |
|
|
|
|
) |
|
|
|
|
punch_out_time = now |
|
|
|
|
if punches and punches[0]["entryNumber"] == PunchCodes.LEAVING.value: |
|
|
|
|
punch_out_time = datetime.datetime.strptime(punches.pop(0)["dateTime"], DateFormats.FULL_DATETIME.value) |
|
|
|
|
|
|
|
|
|
punch_in_time = None |
|
|
|
|
if punches and punches[-1]["entryNumber"] == PunchCodes.COMING.value: |
|
|
|
|
punch_in_time = datetime.datetime.strptime(punches.pop(-1)["dateTime"], DateFormats.FULL_DATETIME.value) |
|
|
|
|
|
|
|
|
|
if punch_in_time is None: |
|
|
|
|
return datetime.timedelta(seconds=0) |
|
|
|
|
|
|
|
|
|
breaks = [] |
|
|
|
|
end = now |
|
|
|
|
for i in punches: |
|
|
|
|
if i["entryNumber"] == PunchCodes.BREAK_END.value: |
|
|
|
|
end = datetime.datetime.strptime(i["dateTime"], DateFormats.FULL_DATETIME.value) |
|
|
|
|
if i["entryNumber"] == PunchCodes.BREAK_START.value: |
|
|
|
|
start = datetime.datetime.strptime(i["dateTime"], DateFormats.FULL_DATETIME.value) |
|
|
|
|
breaks.append(end - start) |
|
|
|
|
end = now |
|
|
|
|
|
|
|
|
|
time_delta = punch_out_time - punch_in_time |
|
|
|
|
for i in breaks: |
|
|
|
|
time_delta = time_delta - i |
|
|
|
|
return time_delta |
|
|
|
|