Compare commits
4 Commits
bd34e0f5cc
...
9ab782dcc9
| Author | SHA1 | Date | |
|---|---|---|---|
| 9ab782dcc9 | |||
| aff2644676 | |||
| e1ef82fa18 | |||
| e4c15291db |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -132,7 +132,7 @@ dmypy.json
|
|||||||
TODO
|
TODO
|
||||||
BB_gradebooks/
|
BB_gradebooks/
|
||||||
BB_submissions/
|
BB_submissions/
|
||||||
csv/
|
csv-inspect/
|
||||||
|
|
||||||
!BB_gradebooks/README.md
|
!BB_gradebooks/README.md
|
||||||
!BB_submissions/README.md
|
!BB_submissions/README.md
|
||||||
|
|||||||
16
inspect_gradebook.py
Normal file
16
inspect_gradebook.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
import os, sys
|
||||||
|
from utils.inspector import generate_hashes_gradebook, generate_duplicate_hashes_gradebook
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
gradebook_dir_name = ' '.join(sys.argv[1:]) if len(sys.argv) > 1 else exit(f'\nNo gradebook dir name given. Provide the name as an argument.\n\nUsage: python {sys.argv[0]} [gradebook dir name]\nExample: python {sys.argv[0]} AssignmentX\n')
|
||||||
|
|
||||||
|
gradebook_dir_path = os.path.join('BB_gradebooks', gradebook_dir_name)
|
||||||
|
# generate CSV file with hashes for all files in gradebook & return path to CSV file for finding duplicate hashes
|
||||||
|
hashes_csv_file_path = generate_hashes_gradebook(gradebook_dir_path)
|
||||||
|
# generate CSV file with files having duplicate hashes
|
||||||
|
generate_duplicate_hashes_gradebook(hashes_csv_file_path)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@@ -1,16 +1,15 @@
|
|||||||
import os, sys
|
import os, sys
|
||||||
from utils.inspector import hash_submissions, inspect_for_duplicate_hashes
|
from utils.inspector import generate_hashes_submissions, generate_duplicate_hashes_submissions
|
||||||
|
|
||||||
CSV_DIR = os.path.join(os.getcwd(), 'csv')
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
submissions_dir_name = ' '.join(sys.argv[1:]) if len(sys.argv) > 1 else exit(f'\nNo submissions dir name given. Provide the name as an argument.\n\nUsage: python {sys.argv[0]} [submissions dir name]\nExample: python {sys.argv[0]} AssignmentX\n')
|
submissions_dir_name = ' '.join(sys.argv[1:]) if len(sys.argv) > 1 else exit(f'\nNo submissions dir name given. Provide the name as an argument.\n\nUsage: python {sys.argv[0]} [submissions dir name]\nExample: python {sys.argv[0]} AssignmentX\n')
|
||||||
|
|
||||||
submissions_dir_path = os.path.join('BB_submissions', submissions_dir_name)
|
submissions_dir_path = os.path.join('BB_submissions', submissions_dir_name)
|
||||||
if not os.path.isdir(submissions_dir_path):
|
# generate CSV file with hashes for all files in submissions (except for any 'excluded') & return path to CSV file for finding duplicate hashes
|
||||||
exit(f'Directory {submissions_dir_path} does not exist.\nMake sure "{submissions_dir_name}" exists in "BB_submissions".\n')
|
hashes_csv_file_path = generate_hashes_submissions(submissions_dir_path)
|
||||||
else:
|
# generate CSV file with files having duplicate hashes
|
||||||
hashes_csv_file_path = hash_submissions(submissions_dir_path) # generate CSV file with hashes for all files (except for any 'excluded') & return path to CSV file for finding duplicate/suspicious hashes
|
generate_duplicate_hashes_submissions(hashes_csv_file_path)
|
||||||
inspect_for_duplicate_hashes(hashes_csv_file_path) # generate CSV file with files having duplicate/suspicious hashes
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|||||||
@@ -2,7 +2,8 @@ import os, shutil, platform
|
|||||||
import zipfile, rarfile
|
import zipfile, rarfile
|
||||||
from py7zr import SevenZipFile, exceptions
|
from py7zr import SevenZipFile, exceptions
|
||||||
|
|
||||||
BAD_DIR_NAME = '__BAD__'
|
from utils.settings import BAD_DIR_NAME
|
||||||
|
|
||||||
|
|
||||||
def mark_file_as_BAD(file: str, bad_exception: Exception) -> None:
|
def mark_file_as_BAD(file: str, bad_exception: Exception) -> None:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -3,8 +3,9 @@ from datetime import datetime
|
|||||||
import csv
|
import csv
|
||||||
import hashlib
|
import hashlib
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
CSV_DIR = os.path.join(os.getcwd(), 'csv')
|
from utils.settings import CSV_DIR
|
||||||
|
|
||||||
|
|
||||||
def load_excluded_filenames(submissions_dir_name: str) -> list[str]: # helper function for hashing all files
|
def load_excluded_filenames(submissions_dir_name: str) -> list[str]: # helper function for hashing all files
|
||||||
@@ -38,50 +39,85 @@ def get_hashes_in_dir(dir_path: str, excluded_filenames: list = []) -> list: #
|
|||||||
return hash_list
|
return hash_list
|
||||||
|
|
||||||
|
|
||||||
def hash_submissions(submissions_dir_path: str) -> str: # main function for hashing all files
|
def generate_hashes_gradebook(gradebook_dir_path: str) -> str: # main function for hashing all files in gradebook
|
||||||
os.makedirs(CSV_DIR, exist_ok=True)
|
gradebook_dir_name = os.path.abspath(gradebook_dir_path).split(os.path.sep) # get name of gradebook by separating path and use rightmost part
|
||||||
submissions_dir_name = os.path.abspath(submissions_dir_path).split(os.path.sep)[-1] # get name of submission/assignment by separating path and use rightmost part
|
if not os.path.isdir(gradebook_dir_path):
|
||||||
excluded_filenames = load_excluded_filenames(submissions_dir_name)
|
exit(f'Directory {gradebook_dir_path} does not exist.\nMake sure "{gradebook_dir_name}" exists in "BB_gradebooks".\n')
|
||||||
|
|
||||||
|
dicts_with_hashes_list = get_hashes_in_dir(gradebook_dir_path)
|
||||||
|
for hash_dict in dicts_with_hashes_list:
|
||||||
|
student_id = hash_dict['filename'].split('_attempt_')[0].split('_')[-1]
|
||||||
|
del hash_dict['filepath']
|
||||||
|
hash_dict.update({'Student ID': student_id})
|
||||||
|
|
||||||
csv_file_name = f'{submissions_dir_name}_file_hashes_{datetime.now().strftime("%Y%m%d-%H%M%S")}.csv'
|
os.makedirs(CSV_DIR, exist_ok=True)
|
||||||
|
csv_file_name = f'{gradebook_dir_name}_gradebook_file_hashes_{datetime.now().strftime("%Y%m%d-%H%M%S")}.csv'
|
||||||
csv_file_path = os.path.join(CSV_DIR, csv_file_name)
|
csv_file_path = os.path.join(CSV_DIR, csv_file_name)
|
||||||
|
|
||||||
|
with open(csv_file_path, 'w', newline='') as csvfile: # open the output CSV file for writing
|
||||||
|
fieldnames = ['Student ID', 'filename', 'sha256 hash']
|
||||||
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||||
|
writer.writeheader()
|
||||||
|
writer.writerows(dicts_with_hashes_list)
|
||||||
|
print(f'[INFO] Created CSV file with all files & hashes in gradebook: {gradebook_dir_name}\nCSV file: {csv_file_path}')
|
||||||
|
return csv_file_path
|
||||||
|
|
||||||
|
|
||||||
|
def generate_hashes_submissions(submissions_dir_path: str) -> str: # main function for hashing all files in submissions
|
||||||
|
submissions_dir_name = os.path.abspath(submissions_dir_path).split(os.path.sep)[-1] # get name of submission/assignment by separating path and use rightmost part
|
||||||
|
if not os.path.isdir(submissions_dir_path):
|
||||||
|
exit(f'Directory {submissions_dir_path} does not exist.\nMake sure "{submissions_dir_name}" exists in "BB_submissions".\n')
|
||||||
|
|
||||||
|
excluded_filenames = load_excluded_filenames(submissions_dir_name)
|
||||||
|
dicts_with_hashes_list = []
|
||||||
|
for student_dir_name in os.listdir(submissions_dir_path): # loop through each student dir to get hashes for all files per student
|
||||||
|
student_dir_path = os.path.join(submissions_dir_path, student_dir_name)
|
||||||
|
student_dicts_with_hashes_list = get_hashes_in_dir(student_dir_path, excluded_filenames) # dict with hashes for all student files - except for 'excluded' file names
|
||||||
|
student_dicts_list = []
|
||||||
|
for hash_dict in student_dicts_with_hashes_list:
|
||||||
|
hash_dict.update({'Student ID': student_dir_name}) # update hash records with student id
|
||||||
|
student_dicts_list.append(hash_dict) # append file dict to student list of dict for csv export
|
||||||
|
|
||||||
|
dicts_with_hashes_list.append(student_dicts_list) # append student hashes to main list with all submissions
|
||||||
|
|
||||||
|
os.makedirs(CSV_DIR, exist_ok=True)
|
||||||
|
csv_file_name = f'{submissions_dir_name}_submissions_file_hashes_{datetime.now().strftime("%Y%m%d-%H%M%S")}.csv'
|
||||||
|
csv_file_path = os.path.join(CSV_DIR, csv_file_name)
|
||||||
|
|
||||||
with open(csv_file_path, 'w', newline='') as csvfile: # open the output CSV file for writing
|
with open(csv_file_path, 'w', newline='') as csvfile: # open the output CSV file for writing
|
||||||
fieldnames = ['Student ID', 'filepath', 'filename', 'sha256 hash']
|
fieldnames = ['Student ID', 'filepath', 'filename', 'sha256 hash']
|
||||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||||
writer.writeheader()
|
writer.writeheader()
|
||||||
|
for student_dict in dicts_with_hashes_list:
|
||||||
for student_dir_name in os.listdir(submissions_dir_path): # loop through each student dir to get hashes for all files per student
|
writer.writerows(student_dict)
|
||||||
student_dir_path = os.path.join(submissions_dir_path, student_dir_name)
|
print(f'[INFO] Created CSV file with all files & hashes for submissions in: {submissions_dir_name}\nCSV file: {csv_file_path}')
|
||||||
hashes_dict = get_hashes_in_dir(student_dir_path, excluded_filenames) # dict with hashes for all student files - except for 'excluded' file names
|
|
||||||
for d in hashes_dict:
|
|
||||||
d.update({'Student ID': student_dir_name}) # update hash records with student id
|
|
||||||
writer.writerows(hashes_dict)
|
|
||||||
print(f'[INFO] Created CSV file with all files & hashes in {submissions_dir_name}\nCSV file: {csv_file_path}')
|
|
||||||
return csv_file_path
|
return csv_file_path
|
||||||
|
|
||||||
|
|
||||||
def inspect_for_duplicate_hashes(hashes_csv_file_path: str): # main function for finding duplicate / suspicious hashes
|
def generate_duplicate_hashes_generic(hashes_csv_file_path: str, drop_columns: list[str]):
|
||||||
csv = pd.read_csv(hashes_csv_file_path)
|
csv = pd.read_csv(hashes_csv_file_path)
|
||||||
df = pd.DataFrame(csv) # df with all files and their hashes
|
df = pd.DataFrame(csv) # df with all files and their hashes
|
||||||
drop_columns = ['filepath', 'filename'] # only need to keep 'student id' and 'sha256 hash' for groupby later
|
|
||||||
df_clean = df.drop(columns=drop_columns) # clear not needed columns
|
df_clean = df.drop(columns=drop_columns) # clear not needed columns
|
||||||
duplicate_hash = df_clean.loc[df_clean.duplicated(subset=['sha256 hash'], keep=False), :] # all files with duplicate hash - incl. files from the same student id
|
duplicate_hash = df_clean.loc[df_clean.duplicated(subset=['sha256 hash'], keep=False), :] # all files with duplicate hash - incl. files from the same student id
|
||||||
|
|
||||||
# agg() for 'Student ID' True if more than 1 in groupby (= files with the same hash by multiple student ids)
|
# agg() for 'Student ID' True if more than 1 in groupby (= files with the same hash by multiple student ids)
|
||||||
# False if unique (= files from the same student id with the same hash)
|
# False if unique (= files from the same student id with the same hash)
|
||||||
hash_with_multiple_student_ids = duplicate_hash.groupby('sha256 hash').agg(lambda x: len(x.unique())>1)
|
hash_with_multiple_student_ids = duplicate_hash.groupby('sha256 hash').agg(lambda x: len(x.unique())>1)
|
||||||
|
|
||||||
# list with duplicate hashes - only if different student id (doesn't include files from same student id)
|
# list with duplicate hashes - only if different student id (doesn't include files from same student id)
|
||||||
suspicious_hashes_list = hash_with_multiple_student_ids[hash_with_multiple_student_ids['Student ID']==True].index.to_list()
|
duplicate_hashes_list = hash_with_multiple_student_ids[hash_with_multiple_student_ids['Student ID']==True].index.to_list()
|
||||||
|
|
||||||
files_with_suspicious_hash = df[df['sha256 hash'].isin(suspicious_hashes_list)] # df with all files with duplicate/suspicious hash, excludes files from the same student id
|
files_with_duplicate_hash = df[df['sha256 hash'].isin(duplicate_hashes_list)] # df with all files with duplicate hash, excludes files from the same student id
|
||||||
df_suspicious = files_with_suspicious_hash.sort_values(['sha256 hash', 'Student ID']) # sort before output to csv
|
df_duplicate = files_with_duplicate_hash.sort_values(['sha256 hash', 'Student ID']) # sort before output to csv
|
||||||
|
|
||||||
|
gradebook_or_submissions_str = os.path.basename(hashes_csv_file_path).split('_file_hashes_')[0].split('_')[-1] # 'gradebook' or 'submissions' depending on which files hashes csv is read
|
||||||
|
assignment_name = os.path.basename(hashes_csv_file_path).split(f'_{gradebook_or_submissions_str}_')[0]
|
||||||
|
csv_out = hashes_csv_file_path.rsplit('_', 1)[0].replace('file_hashes', 'duplicate_') + datetime.now().strftime("%Y%m%d-%H%M%S") + '.csv'
|
||||||
try:
|
try:
|
||||||
submissions_dir_name = os.path.basename(hashes_csv_file_path).split('_file_hashes_')[0]
|
df_duplicate.to_csv(csv_out, index=False)
|
||||||
csv_out = hashes_csv_file_path.rsplit('_', 1)[0].replace('file_hashes', 'suspicious_') + datetime.now().strftime("%Y%m%d-%H%M%S") + '.csv'
|
print(f'[INFO] Created CSV file with duplicate hashes in {gradebook_or_submissions_str}: {assignment_name}\nCSV file: {csv_out}')
|
||||||
df_suspicious.to_csv(csv_out, index=False)
|
|
||||||
print(f'[INFO] Created CSV file with duplicate/suspicious hashes in {submissions_dir_name}\nCSV file: {csv_out}')
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
exit(f'[ERROR] Something went wrong while trying to save csv file with suspicious hashes\nError message: {e}')
|
exit(f'[ERROR] Something went wrong while trying to save csv file with duplicate hashes\nError message: {e}')
|
||||||
|
|
||||||
|
# partials for generate_duplicate_hashes_generic(), setting the appropriate drop_columns for gradebook / submissions
|
||||||
|
generate_duplicate_hashes_gradebook = partial(generate_duplicate_hashes_generic, drop_columns=['filename'])
|
||||||
|
generate_duplicate_hashes_submissions = partial(generate_duplicate_hashes_generic, drop_columns=['filepath', 'filename'])
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
import os, shutil, re
|
import os, shutil, re
|
||||||
from utils.extractor import extract_file_to_dir
|
from utils.extractor import extract_file_to_dir
|
||||||
|
from utils.settings import BAD_DIR_NAME
|
||||||
BAD_DIR_NAME = '__BAD__'
|
|
||||||
|
|
||||||
|
|
||||||
def validate_gradebook_dir_name(src_dir: str) -> None:
|
def validate_gradebook_dir_name(src_dir: str) -> None:
|
||||||
|
|||||||
4
utils/settings.py
Normal file
4
utils/settings.py
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
BAD_DIR_NAME = '__BAD__' # for organise_gradebook.py - directory with corrupt/invalid compressed files
|
||||||
|
CSV_DIR = os.path.join(os.getcwd(), 'csv-inspect') # for inspect_gradebook.py and inspect_submissions.py - output dir for generated CSV files
|
||||||
Reference in New Issue
Block a user