added typing & more code cleanup
This commit is contained in:
@@ -4,7 +4,7 @@ from py7zr import SevenZipFile, exceptions
|
|||||||
|
|
||||||
BAD_DIR_NAME = '__BAD__'
|
BAD_DIR_NAME = '__BAD__'
|
||||||
|
|
||||||
def mark_file_as_BAD(file, bad_exception):
|
def mark_file_as_BAD(file: str, bad_exception: Exception) -> None:
|
||||||
try:
|
try:
|
||||||
filename = os.path.basename(file)
|
filename = os.path.basename(file)
|
||||||
bad_dir = os.path.join(os.path.dirname(file), BAD_DIR_NAME)
|
bad_dir = os.path.join(os.path.dirname(file), BAD_DIR_NAME)
|
||||||
@@ -16,7 +16,7 @@ def mark_file_as_BAD(file, bad_exception):
|
|||||||
print(f'[Error] {e}')
|
print(f'[Error] {e}')
|
||||||
|
|
||||||
|
|
||||||
def extract_zip(zip_file, target_dir):
|
def extract_zip(zip_file: str, target_dir: str) -> None:
|
||||||
try:
|
try:
|
||||||
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
|
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
|
||||||
members = [ m for m in zip_ref.infolist() if "__MACOSX" not in m.filename ]
|
members = [ m for m in zip_ref.infolist() if "__MACOSX" not in m.filename ]
|
||||||
@@ -25,10 +25,10 @@ def extract_zip(zip_file, target_dir):
|
|||||||
except zipfile.BadZipfile as e:
|
except zipfile.BadZipfile as e:
|
||||||
mark_file_as_BAD(zip_file, e)
|
mark_file_as_BAD(zip_file, e)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'[ERROR] Something went wrong while extracting zip contents. Check the error message, get student id and download / organise manually\nError message: {e}')
|
print(f'[ERROR] Something went wrong while extracting the contents of a submitted zip file. Check the error message, get student id and download / organise manually\nError message: {e}')
|
||||||
|
|
||||||
|
|
||||||
def extract_rar(rar_file, target_dir):
|
def extract_rar(rar_file: str, target_dir: str) -> None:
|
||||||
try:
|
try:
|
||||||
with rarfile.RarFile(rar_file, 'r') as rar_ref:
|
with rarfile.RarFile(rar_file, 'r') as rar_ref:
|
||||||
if platform.system() == 'Windows':
|
if platform.system() == 'Windows':
|
||||||
@@ -48,7 +48,7 @@ def extract_rar(rar_file, target_dir):
|
|||||||
exit()
|
exit()
|
||||||
|
|
||||||
|
|
||||||
def extract_7z(seven_zip_file, target_dir):
|
def extract_7z(seven_zip_file: str, target_dir: str) -> None:
|
||||||
try: # extract the 7z file using py7zr
|
try: # extract the 7z file using py7zr
|
||||||
with open(seven_zip_file, 'rb') as f:
|
with open(seven_zip_file, 'rb') as f:
|
||||||
seven_zip = SevenZipFile(seven_zip_file, mode='r')
|
seven_zip = SevenZipFile(seven_zip_file, mode='r')
|
||||||
@@ -64,7 +64,7 @@ def extract_7z(seven_zip_file, target_dir):
|
|||||||
mark_file_as_BAD(seven_zip_file, e)
|
mark_file_as_BAD(seven_zip_file, e)
|
||||||
|
|
||||||
|
|
||||||
def extract_file_to_dir(file_path, student_dir):
|
def extract_file_to_dir(file_path: str, student_dir: str) -> None:
|
||||||
os.makedirs(student_dir, exist_ok=True) # create the subdirectory for student
|
os.makedirs(student_dir, exist_ok=True) # create the subdirectory for student
|
||||||
|
|
||||||
if file_path.lower().endswith('.zip'):
|
if file_path.lower().endswith('.zip'):
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ def load_excluded_filenames(submissions_dir_name: str) -> list[str]: # helper f
|
|||||||
try:
|
try:
|
||||||
df = pd.read_csv(csv_file_path)
|
df = pd.read_csv(csv_file_path)
|
||||||
filename_list = df['exclude_filename'].tolist() # get the values of the 'filename' column as a list
|
filename_list = df['exclude_filename'].tolist() # get the values of the 'filename' column as a list
|
||||||
|
filename_list = [ f.lower() for f in filename_list ] # convert to lowercase for comparison with submission files
|
||||||
print(f'[INFO] Using CSV file with list of excluded file names: {csv_file_path}')
|
print(f'[INFO] Using CSV file with list of excluded file names: {csv_file_path}')
|
||||||
return filename_list
|
return filename_list
|
||||||
except Exception as e: # any exception, print error and return empty list to continue without any excluded file names
|
except Exception as e: # any exception, print error and return empty list to continue without any excluded file names
|
||||||
@@ -28,7 +29,7 @@ def get_hashes_in_dir(dir_path: str, excluded_filenames: list = []) -> list: #
|
|||||||
hash_list = []
|
hash_list = []
|
||||||
for subdir, dirs, files in os.walk(dir_path): # loop through all files in the directory and generate hashes
|
for subdir, dirs, files in os.walk(dir_path): # loop through all files in the directory and generate hashes
|
||||||
for filename in files:
|
for filename in files:
|
||||||
if filename not in excluded_filenames: # do not hash for inspection file names in the excluded list
|
if filename.lower() not in excluded_filenames: # convert to lowercase for comparison with excluded files & do not hash if in the excluded list
|
||||||
filepath = os.path.join(subdir, filename)
|
filepath = os.path.join(subdir, filename)
|
||||||
with open(filepath, 'rb') as f:
|
with open(filepath, 'rb') as f:
|
||||||
filehash = hashlib.sha256(f.read()).hexdigest()
|
filehash = hashlib.sha256(f.read()).hexdigest()
|
||||||
@@ -62,10 +63,16 @@ def inspect_for_duplicate_hashes(hashes_csv_file_path: str): # main function fo
|
|||||||
csv = pd.read_csv(hashes_csv_file_path)
|
csv = pd.read_csv(hashes_csv_file_path)
|
||||||
df = pd.DataFrame(csv) # df with all files and their hashes
|
df = pd.DataFrame(csv) # df with all files and their hashes
|
||||||
drop_columns = ['filepath', 'filename'] # only need to keep 'student id' and 'sha256 hash' for groupby later
|
drop_columns = ['filepath', 'filename'] # only need to keep 'student id' and 'sha256 hash' for groupby later
|
||||||
df = df.drop(columns=drop_columns) # clear not needed columns
|
df_clean = df.drop(columns=drop_columns) # clear not needed columns
|
||||||
duplicate_hash = df.loc[df.duplicated(subset=['sha256 hash'], keep=False), :] # all files with duplicate hash - incl. files from the same student id
|
duplicate_hash = df_clean.loc[df_clean.duplicated(subset=['sha256 hash'], keep=False), :] # all files with duplicate hash - incl. files from the same student id
|
||||||
hash_with_multiple_student_ids = duplicate_hash.groupby('sha256 hash').agg(lambda x: len(x.unique())>1) # true if more than 1 unique student ids (= files with the same hash by multiple student ids), false if unique student id (= files from the same student id with the same hash)
|
|
||||||
suspicious_hashes_list = hash_with_multiple_student_ids[hash_with_multiple_student_ids['Student ID']==True].index.to_list() # list with duplicate hashes - only if different student id (doesn't include files from same student id)
|
# agg() for 'Student ID' True if more than 1 in groupby (= files with the same hash by multiple student ids)
|
||||||
|
# False if unique (= files from the same student id with the same hash)
|
||||||
|
hash_with_multiple_student_ids = duplicate_hash.groupby('sha256 hash').agg(lambda x: len(x.unique())>1)
|
||||||
|
|
||||||
|
# list with duplicate hashes - only if different student id (doesn't include files from same student id)
|
||||||
|
suspicious_hashes_list = hash_with_multiple_student_ids[hash_with_multiple_student_ids['Student ID']==True].index.to_list()
|
||||||
|
|
||||||
files_with_suspicious_hash = df[df['sha256 hash'].isin(suspicious_hashes_list)] # df with all files with duplicate/suspicious hash, excludes files from the same student id
|
files_with_suspicious_hash = df[df['sha256 hash'].isin(suspicious_hashes_list)] # df with all files with duplicate/suspicious hash, excludes files from the same student id
|
||||||
df_suspicious = files_with_suspicious_hash.sort_values(['sha256 hash', 'Student ID']) # sort before output to csv
|
df_suspicious = files_with_suspicious_hash.sort_values(['sha256 hash', 'Student ID']) # sort before output to csv
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from utils.extractor import extract_file_to_dir
|
|||||||
BAD_DIR_NAME = '__BAD__'
|
BAD_DIR_NAME = '__BAD__'
|
||||||
|
|
||||||
|
|
||||||
def validate_gradebook_dir_name(src_dir):
|
def validate_gradebook_dir_name(src_dir: str) -> None:
|
||||||
if not os.path.isdir(src_dir): # check if it exists and is a directory
|
if not os.path.isdir(src_dir): # check if it exists and is a directory
|
||||||
print(f"\n[Error] Incorrect directory: {src_dir}\n[Info] Make sure the directory exists in 'BB_gradebooks'")
|
print(f"\n[Error] Incorrect directory: {src_dir}\n[Info] Make sure the directory exists in 'BB_gradebooks'")
|
||||||
exit()
|
exit()
|
||||||
@@ -16,7 +16,7 @@ def validate_gradebook_dir_name(src_dir):
|
|||||||
exit()
|
exit()
|
||||||
|
|
||||||
|
|
||||||
def get_comment_from_submission_txt(file_path):
|
def get_comment_from_submission_txt(file_path: str) -> str | None:
|
||||||
no_comment_text = f'Comments:\nThere are no student comments for this assignment.'
|
no_comment_text = f'Comments:\nThere are no student comments for this assignment.'
|
||||||
no_comment_text_regex = no_comment_text
|
no_comment_text_regex = no_comment_text
|
||||||
no_comment_regex_compile = re.compile(no_comment_text_regex)
|
no_comment_regex_compile = re.compile(no_comment_text_regex)
|
||||||
@@ -30,9 +30,10 @@ def get_comment_from_submission_txt(file_path):
|
|||||||
match = str(match).replace('\\n', '').replace('[','').replace(']','').replace('"','')
|
match = str(match).replace('\\n', '').replace('[','').replace(']','').replace('"','')
|
||||||
match = str(match).split('Comments:')[-1]
|
match = str(match).split('Comments:')[-1]
|
||||||
return match
|
return match
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_gradebook_stats(src_dir):
|
def get_gradebook_stats(src_dir: str) -> dict[str, int]:
|
||||||
all_files = [ os.path.join(src_dir, f) for f in os.listdir(src_dir) if BAD_DIR_NAME not in f ]
|
all_files = [ os.path.join(src_dir, f) for f in os.listdir(src_dir) if BAD_DIR_NAME not in f ]
|
||||||
dirs = [ f for f in all_files if os.path.isdir(f) and BAD_DIR_NAME not in f ]
|
dirs = [ f for f in all_files if os.path.isdir(f) and BAD_DIR_NAME not in f ]
|
||||||
normal_files = [ f for f in all_files if os.path.isfile(f) ]
|
normal_files = [ f for f in all_files if os.path.isfile(f) ]
|
||||||
@@ -57,7 +58,7 @@ def get_gradebook_stats(src_dir):
|
|||||||
return files_counter
|
return files_counter
|
||||||
|
|
||||||
|
|
||||||
def organise_file_per_student(src_dir, dest_dir, file_name, student_no):
|
def organise_file_per_student(src_dir: str, dest_dir: str, file_name: str, student_no: str) -> None:
|
||||||
student_dir = os.path.join(dest_dir, student_no)
|
student_dir = os.path.join(dest_dir, student_no)
|
||||||
os.makedirs(student_dir, exist_ok=True) # create student directory if it doesn't exist
|
os.makedirs(student_dir, exist_ok=True) # create student directory if it doesn't exist
|
||||||
file_path = os.path.join(src_dir, file_name)
|
file_path = os.path.join(src_dir, file_name)
|
||||||
@@ -80,7 +81,7 @@ def organise_file_per_student(src_dir, dest_dir, file_name, student_no):
|
|||||||
shutil.move(file_path, new_file_path) # move the file to student directory
|
shutil.move(file_path, new_file_path) # move the file to student directory
|
||||||
|
|
||||||
|
|
||||||
def organise_gradebook(src_dir, dest_dir):
|
def organise_gradebook(src_dir: str, dest_dir: str) -> None:
|
||||||
"""1) extracts .zip, .rar, .7z files, organises contents into directories per student number, and deletes compressed files after successful extraction
|
"""1) extracts .zip, .rar, .7z files, organises contents into directories per student number, and deletes compressed files after successful extraction
|
||||||
2) organises all other files in gradebook into directories per student number
|
2) organises all other files in gradebook into directories per student number
|
||||||
3) checks if there are any comments in submission text files and extracts them into a file
|
3) checks if there are any comments in submission text files and extracts them into a file
|
||||||
@@ -88,7 +89,7 @@ def organise_gradebook(src_dir, dest_dir):
|
|||||||
validate_gradebook_dir_name(src_dir) # check if dir exists, and has files in it - exits if not
|
validate_gradebook_dir_name(src_dir) # check if dir exists, and has files in it - exits if not
|
||||||
os.makedirs(dest_dir, exist_ok=True) # create the destination directory if it doesn't exist
|
os.makedirs(dest_dir, exist_ok=True) # create the destination directory if it doesn't exist
|
||||||
files_counter = get_gradebook_stats(src_dir) # print stats about the files in gradebook and get files_counter dict to use later
|
files_counter = get_gradebook_stats(src_dir) # print stats about the files in gradebook and get files_counter dict to use later
|
||||||
students_numbers = [] # list to add and count unique student numbers from all files in gradebook
|
students_numbers: list[str] = [] # list to add and count unique student numbers from all files in gradebook
|
||||||
print('\nStart organising...\n')
|
print('\nStart organising...\n')
|
||||||
for file_name in os.listdir(src_dir): # iterate through all files in the directory
|
for file_name in os.listdir(src_dir): # iterate through all files in the directory
|
||||||
if BAD_DIR_NAME not in file_name: # ignore dir BAD_DIR_NAME (created after first run if corrupt compressed files found)
|
if BAD_DIR_NAME not in file_name: # ignore dir BAD_DIR_NAME (created after first run if corrupt compressed files found)
|
||||||
@@ -107,11 +108,11 @@ def organise_gradebook(src_dir, dest_dir):
|
|||||||
print(f'[Note] Compressed files (.zip, .rar, .7z) are automatically deleted from the gradebook directory after successful extraction')
|
print(f'[Note] Compressed files (.zip, .rar, .7z) are automatically deleted from the gradebook directory after successful extraction')
|
||||||
|
|
||||||
|
|
||||||
def check_submissions_dir_for_compressed(submissions_dir):
|
def check_submissions_dir_for_compressed(submissions_dir: str) -> None:
|
||||||
"""checks if any submitted compressed files contain more compressed files inside (they are not recursively extracted)
|
"""checks if any submitted compressed files contain more compressed files inside (they are not recursively extracted)
|
||||||
\nprints any compressed files location that need to be extracted manually
|
\nprints any compressed files location that need to be extracted manually
|
||||||
"""
|
"""
|
||||||
compressed_files = []
|
compressed_files: list[str] = []
|
||||||
abs_path = os.getcwd()
|
abs_path = os.getcwd()
|
||||||
for the_path, dirc, files in os.walk(submissions_dir):
|
for the_path, dirc, files in os.walk(submissions_dir):
|
||||||
for fname in files:
|
for fname in files:
|
||||||
|
|||||||
Reference in New Issue
Block a user