diff --git a/utils/extractor.py b/utils/extractor.py index 66622d4..a8e9057 100644 --- a/utils/extractor.py +++ b/utils/extractor.py @@ -4,7 +4,7 @@ from py7zr import SevenZipFile, exceptions BAD_DIR_NAME = '__BAD__' -def mark_file_as_BAD(file, bad_exception): +def mark_file_as_BAD(file: str, bad_exception: Exception) -> None: try: filename = os.path.basename(file) bad_dir = os.path.join(os.path.dirname(file), BAD_DIR_NAME) @@ -16,7 +16,7 @@ def mark_file_as_BAD(file, bad_exception): print(f'[Error] {e}') -def extract_zip(zip_file, target_dir): +def extract_zip(zip_file: str, target_dir: str) -> None: try: with zipfile.ZipFile(zip_file, 'r') as zip_ref: members = [ m for m in zip_ref.infolist() if "__MACOSX" not in m.filename ] @@ -25,10 +25,10 @@ def extract_zip(zip_file, target_dir): except zipfile.BadZipfile as e: mark_file_as_BAD(zip_file, e) except Exception as e: - print(f'[ERROR] Something went wrong while extracting zip contents. Check the error message, get student id and download / organise manually\nError message: {e}') + print(f'[ERROR] Something went wrong while extracting the contents of a submitted zip file. Check the error message, get student id and download / organise manually\nError message: {e}') -def extract_rar(rar_file, target_dir): +def extract_rar(rar_file: str, target_dir: str) -> None: try: with rarfile.RarFile(rar_file, 'r') as rar_ref: if platform.system() == 'Windows': @@ -48,7 +48,7 @@ def extract_rar(rar_file, target_dir): exit() -def extract_7z(seven_zip_file, target_dir): +def extract_7z(seven_zip_file: str, target_dir: str) -> None: try: # extract the 7z file using py7zr with open(seven_zip_file, 'rb') as f: seven_zip = SevenZipFile(seven_zip_file, mode='r') @@ -64,7 +64,7 @@ def extract_7z(seven_zip_file, target_dir): mark_file_as_BAD(seven_zip_file, e) -def extract_file_to_dir(file_path, student_dir): +def extract_file_to_dir(file_path: str, student_dir: str) -> None: os.makedirs(student_dir, exist_ok=True) # create the subdirectory for student if file_path.lower().endswith('.zip'): diff --git a/utils/inspector.py b/utils/inspector.py index 340e814..e52ad22 100644 --- a/utils/inspector.py +++ b/utils/inspector.py @@ -16,6 +16,7 @@ def load_excluded_filenames(submissions_dir_name: str) -> list[str]: # helper f try: df = pd.read_csv(csv_file_path) filename_list = df['exclude_filename'].tolist() # get the values of the 'filename' column as a list + filename_list = [ f.lower() for f in filename_list ] # convert to lowercase for comparison with submission files print(f'[INFO] Using CSV file with list of excluded file names: {csv_file_path}') return filename_list except Exception as e: # any exception, print error and return empty list to continue without any excluded file names @@ -28,7 +29,7 @@ def get_hashes_in_dir(dir_path: str, excluded_filenames: list = []) -> list: # hash_list = [] for subdir, dirs, files in os.walk(dir_path): # loop through all files in the directory and generate hashes for filename in files: - if filename not in excluded_filenames: # do not hash for inspection file names in the excluded list + if filename.lower() not in excluded_filenames: # convert to lowercase for comparison with excluded files & do not hash if in the excluded list filepath = os.path.join(subdir, filename) with open(filepath, 'rb') as f: filehash = hashlib.sha256(f.read()).hexdigest() @@ -62,10 +63,16 @@ def inspect_for_duplicate_hashes(hashes_csv_file_path: str): # main function fo csv = pd.read_csv(hashes_csv_file_path) df = pd.DataFrame(csv) # df with all files and their hashes drop_columns = ['filepath', 'filename'] # only need to keep 'student id' and 'sha256 hash' for groupby later - df = df.drop(columns=drop_columns) # clear not needed columns - duplicate_hash = df.loc[df.duplicated(subset=['sha256 hash'], keep=False), :] # all files with duplicate hash - incl. files from the same student id - hash_with_multiple_student_ids = duplicate_hash.groupby('sha256 hash').agg(lambda x: len(x.unique())>1) # true if more than 1 unique student ids (= files with the same hash by multiple student ids), false if unique student id (= files from the same student id with the same hash) - suspicious_hashes_list = hash_with_multiple_student_ids[hash_with_multiple_student_ids['Student ID']==True].index.to_list() # list with duplicate hashes - only if different student id (doesn't include files from same student id) + df_clean = df.drop(columns=drop_columns) # clear not needed columns + duplicate_hash = df_clean.loc[df_clean.duplicated(subset=['sha256 hash'], keep=False), :] # all files with duplicate hash - incl. files from the same student id + + # agg() for 'Student ID' True if more than 1 in groupby (= files with the same hash by multiple student ids) + # False if unique (= files from the same student id with the same hash) + hash_with_multiple_student_ids = duplicate_hash.groupby('sha256 hash').agg(lambda x: len(x.unique())>1) + + # list with duplicate hashes - only if different student id (doesn't include files from same student id) + suspicious_hashes_list = hash_with_multiple_student_ids[hash_with_multiple_student_ids['Student ID']==True].index.to_list() + files_with_suspicious_hash = df[df['sha256 hash'].isin(suspicious_hashes_list)] # df with all files with duplicate/suspicious hash, excludes files from the same student id df_suspicious = files_with_suspicious_hash.sort_values(['sha256 hash', 'Student ID']) # sort before output to csv diff --git a/utils/organiser.py b/utils/organiser.py index 6a30bac..f1d1371 100644 --- a/utils/organiser.py +++ b/utils/organiser.py @@ -4,7 +4,7 @@ from utils.extractor import extract_file_to_dir BAD_DIR_NAME = '__BAD__' -def validate_gradebook_dir_name(src_dir): +def validate_gradebook_dir_name(src_dir: str) -> None: if not os.path.isdir(src_dir): # check if it exists and is a directory print(f"\n[Error] Incorrect directory: {src_dir}\n[Info] Make sure the directory exists in 'BB_gradebooks'") exit() @@ -16,7 +16,7 @@ def validate_gradebook_dir_name(src_dir): exit() -def get_comment_from_submission_txt(file_path): +def get_comment_from_submission_txt(file_path: str) -> str | None: no_comment_text = f'Comments:\nThere are no student comments for this assignment.' no_comment_text_regex = no_comment_text no_comment_regex_compile = re.compile(no_comment_text_regex) @@ -30,9 +30,10 @@ def get_comment_from_submission_txt(file_path): match = str(match).replace('\\n', '').replace('[','').replace(']','').replace('"','') match = str(match).split('Comments:')[-1] return match + return None -def get_gradebook_stats(src_dir): +def get_gradebook_stats(src_dir: str) -> dict[str, int]: all_files = [ os.path.join(src_dir, f) for f in os.listdir(src_dir) if BAD_DIR_NAME not in f ] dirs = [ f for f in all_files if os.path.isdir(f) and BAD_DIR_NAME not in f ] normal_files = [ f for f in all_files if os.path.isfile(f) ] @@ -57,7 +58,7 @@ def get_gradebook_stats(src_dir): return files_counter -def organise_file_per_student(src_dir, dest_dir, file_name, student_no): +def organise_file_per_student(src_dir: str, dest_dir: str, file_name: str, student_no: str) -> None: student_dir = os.path.join(dest_dir, student_no) os.makedirs(student_dir, exist_ok=True) # create student directory if it doesn't exist file_path = os.path.join(src_dir, file_name) @@ -80,7 +81,7 @@ def organise_file_per_student(src_dir, dest_dir, file_name, student_no): shutil.move(file_path, new_file_path) # move the file to student directory -def organise_gradebook(src_dir, dest_dir): +def organise_gradebook(src_dir: str, dest_dir: str) -> None: """1) extracts .zip, .rar, .7z files, organises contents into directories per student number, and deletes compressed files after successful extraction 2) organises all other files in gradebook into directories per student number 3) checks if there are any comments in submission text files and extracts them into a file @@ -88,7 +89,7 @@ def organise_gradebook(src_dir, dest_dir): validate_gradebook_dir_name(src_dir) # check if dir exists, and has files in it - exits if not os.makedirs(dest_dir, exist_ok=True) # create the destination directory if it doesn't exist files_counter = get_gradebook_stats(src_dir) # print stats about the files in gradebook and get files_counter dict to use later - students_numbers = [] # list to add and count unique student numbers from all files in gradebook + students_numbers: list[str] = [] # list to add and count unique student numbers from all files in gradebook print('\nStart organising...\n') for file_name in os.listdir(src_dir): # iterate through all files in the directory if BAD_DIR_NAME not in file_name: # ignore dir BAD_DIR_NAME (created after first run if corrupt compressed files found) @@ -107,11 +108,11 @@ def organise_gradebook(src_dir, dest_dir): print(f'[Note] Compressed files (.zip, .rar, .7z) are automatically deleted from the gradebook directory after successful extraction') -def check_submissions_dir_for_compressed(submissions_dir): +def check_submissions_dir_for_compressed(submissions_dir: str) -> None: """checks if any submitted compressed files contain more compressed files inside (they are not recursively extracted) \nprints any compressed files location that need to be extracted manually """ - compressed_files = [] + compressed_files: list[str] = [] abs_path = os.getcwd() for the_path, dirc, files in os.walk(submissions_dir): for fname in files: