Title: Job Monitoring - Admin Version
Date: July 9th 2020
Description:
A simple tutorial about job monitoring for Admin only.
Topics that are included:
Capture information about jobs: Execution time, queue time, by job, sorting, plots with information about the job id on hover
# Install specific packages required for this notebook
!pip install flywheel-sdk pandas
# Import packages
from getpass import getpass
import logging
import os
import datetime
import time
import pprint
from dateutil.tz import tzutc
from IPython.display import display, Image
import flywheel
from permission import check_user_permission
import numpy as np
from tqdm import tqdm
import statistics as stats
from scipy import stats as st
import matplotlib.pyplot as plt
from scipy.stats import normaltest
# Instantiate a logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
log = logging.getLogger('root')
Get a API_KEY. More on this at in the Flywheel SDK doc here.
API_KEY = getpass('Enter API_KEY here: ')
Instantiate the Flywheel API client
fw = flywheel.Client(API_KEY or os.environ.get('FW_KEY'))
del API_KEY
Show Flywheel logging information
log.info('You are now logged in as %s to %s', fw.get_current_user()['email'], fw.get_config()['site']['api_url'])
Before we started our section, we would like to verify that you have the right permission to proceed in this notebook.
min_reqs = {
"site": "site_admin",
"group": "ro",
"project": ['jobs_view',
'jobs_run_cancel',
'jobs_cancel_any']
}
Firstly, we will show you how to find the jobs that you have run previously.
In the example below, we will be getting 2 jobs that you have launched within your instance. You can change the number of jobs that will be returned by modified the limit
variable.
user_id = fw.get_current_user()['email']
user_jobs = fw.jobs.find(f'origin.id={user_id}',limit = '2')
pprint.pprint(user_jobs)
Essentially, you can search for the jobs that launched by other users as well.
sample_id = input('Please enter the user\'s email address that you wished to search for: ')
user_jobs = fw.jobs.find(f'origin.id={sample_id}',limit = '2')
pprint.pprint(user_jobs)
gear_name = 'mriqc'
mriqc_jobs = fw.jobs.find(f'gear_info.name={gear_name}', limit='2')
pprint.pprint(mriqc_jobs)
created_by = '2020-07-01'
filtered_jobs = fw.jobs.find(f'created>{created_by}', limit='2')
pprint.pprint(filtered_jobs)
state = 'complete'
filtered_jobs = fw.jobs.find(f'state={state}', limit='2')
pprint.pprint(filtered_jobs)
Simply use the update
method to cancel the job that is on pending.
filtered_jobs = fw.jobs.find('state=pending', limit='2')
for job in filtered_jobs:
job.update(state='cancelled')
You can also restart a job that has a state of failed
. However, each job can only be retried once.
To demonstrate, we will be restarting mriqc
job that has failed
by iterating through the user_jobs
list that we defined earlier with fw.jobs.find()
method .
We will be using exception handling to prevent from restarting job for more than one times.
Once the job has been successfully restarted, it will return a new job_id
. We will append this new job_id
to a list named retried_job
.
retried_job = list()
for job in user_jobs:
try:
if job.state == 'failed' and job.gear_info['name'] == 'mriqc' and len(retried_job)< 2:
new_job_id = fw.retry_job(job.id)
retried_job.append(new_job_id)
except:
pass
# View the job ID that has been retried
retried_job
In this section, we will present an example of calculating, plotting and then using job statistics for the purpose of cancelling jobs that take too long.
To give you an overview, you can use fw.get_jobs_stats()
method to view the status of all current jobs within the Flywheel Instance.
fw.get_jobs_stats()
Before getting started, we will be defining a few values like the gear name, date of the jobs created and sample size etc.
def validate(date_text):
try:
datetime.datetime.strptime(date_text, '%Y-%m-%d')
log.info('Please proceed to the next cell')
except ValueError:
raise ValueError("Incorrect data format, should be YYYY-MM-DD")
GEAR_NAME = input('Please enter the gear that you wish to print out the information about: ')
CREATED_BY = input('Please enter the date you wish to filter by in this format (yyyy-mm-dd): ')
MAX_SAMPLE_SIZE = input('Please enter the max number of jobs you want to analyze: ')
# Verify if you have entered the right date format
validate(CREATED_BY)
def plot(fw_client, gear_name, created_by, sample_size):
run_times = list()
for job in tqdm(fw_client.jobs.find(f'gear_info.name={gear_name},state="complete",created>{created_by}', limit=sample_size)):
job_container = fw_client.get_job(job.id)
time_delta = job_container.transitions.complete - job_container.transitions.running
run_times.append(time_delta.total_seconds()/60)
if run_times:
plt.hist(run_times)
plt.title(f'{gear_name} run times in minutes')
plt.show()
max_run_time = max(run_times)
min_run_time = min(run_times)
run_time_range = max_run_time - min_run_time
mu = stats.mean(run_times)
sd = stats.stdev(run_times)
# Determine a run_time_cutoff
s, pval = normaltest(run_times)
if pval < 0.01:
print(f's = {s:.2f}. Distribution is normal (enough)... Using 2*sd + mu a cutoff')
run_time_cutoff = 2*sd + mu
else:
print(f's = {s:.2f}. Distribution is not normal (enough)... Using max time + 1sd as a cutoff')
run_time_cutoff = max_run_time + 1*sd
print(f'range={run_time_range:.2f}\nmu = {mu:.2f}\nsd = {sd:.2f}\ncut off = {run_time_cutoff:.2f}')
plot(fw, GEAR_NAME, CREATED_BY, MAX_SAMPLE_SIZE)
sleep_time = 1 # Amount of time (in min) to sleep between checks
while True:
print(f"==============================\n{datetime.datetime.now()}\n==============================\n")
num_pending = len(fw.jobs.find(f'state=pending,created>{CREATED_BY},gear_info.name={GEAR_NAME}', limit=MAX_SAMPLE_SIZE))
print(f'{num_pending} pending {GEAR_NAME} jobs')
running_jobs = fw.jobs.find(f'state=running,created>{CREATED_BY},gear_info.name={GEAR_NAME}', limit=MAX_SAMPLE_SIZE)
print(f'{running_jobs} running {GEAR_NAME} jobs\n')
for j in running_jobs:
job = fw.get_job(j.id)
time_delta = datetime.datetime.now(tz=tzutc()) - job.transitions.running
run_time_min = time_delta.total_seconds()/60
print('{} running for {:.2f} min'.format(job.id, run_time_min))
if run_time_min > run_time_cutoff:
print(f"{job.id} running for {run_time_min} -- cancelled as it is more than the cutoff of {run_time_cutoff}")
print(f'Sleeping {sleep_time} min...')
time.sleep(60*sleep_time)