HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Robust .mpg file copier

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
copierfilempgrobust

Problem

```
#!/usr/bin/env python

"""
Safely copies .mpg files from one volume to another using
temporary filenames and hash verification
"""

import os
from datetime import datetime
from hashlib import sha256
from shutil import copy2

#ask user for origin (where to get files)
def choose_drive(drives, label):
while True:
for d in drives:
print '%s - %s' % (drives.index(d)+1,d)
drive = raw_input('Select %s: ' % label)
if not drive in str(range(0, len(drives)+1)):
continue
else:
return os.path.join('/Volumes', drives[int(drive)-1])

# safe copy
def safe_copy(src, dst):
dst_temp_name = dst+'_INCOMPLETE'
try:
copy2(src,dst_temp_name)
if compare_hashes(src, dst_temp_name):
try:
os.rename(dst_temp_name, dst)
except Exception as e:
print 'Error trying to rename the file: ', e
else:
safe_copy(src, dst)
except Exception as e:
print 'Error trying to copy the file: ', e

# hash a file and return the hash string
def get_local_hash(filename, block_size=2**20):
"""
Hashes a file using SHA1
"""
# print 'Starting hash of %s at %s' % (filename, datetime.now().time())
print 'Hashing', filename
try:
local_file = open(filename,'rb')
except Exception as e:
print 'Error while opening file for hashing: ', e
hasher = sha256()
while True:
data = local_file.read(block_size)
if not data:
break
hasher.update(data)
# print 'Finished hash at %s' % datetime.now().time()
result = hasher.hexdigest()
local_file.close()
return unicode(result)

def compare_hashes(file1, file2):
if get_local_hash(file1) == get_local_hash(file2):
return True

#main
def main():
# get list of volumes
drives = [d for d in os.listdir('/Volumes')]
# ask user for origin drive
origin = choose_drive(drives, 'Origin')
ori

Solution

Responses in comments starting with koj -.

#!/usr/bin/env python

"""
Safely copies .mpg files from one volume to another using
temporary filenames and hash verification
"""

import os
from datetime import datetime
from hashlib import sha256
from shutil import copy2

#ask user for origin (where to get files)
def choose_drive(drives, label):
    # koj - needs documentation
    while True:
        for d in drives: # koj - use for d, drive in enumerate(drives)
            print '%s - %s' % (drives.index(d)+1,d) # koj - print '%d - %s' % (d+1,drive)
        drive = raw_input('Select %s: ' % label)
        # koj - cast to int here inside a try/except. (except ValueError, continue).
        if not drive in str(range(0, len(drives)+1)): # koj - Then, no cast to string
            continue
        else:
            return os.path.join('/Volumes', drives[int(drive)-1]) # koj - no need to cast to int anymore

# safe copy
def safe_copy(src, dst):
    # koj - needs documentation
    dst_temp_name = dst+'_INCOMPLETE'
    try:
        copy2(src,dst_temp_name)
        if compare_hashes(src, dst_temp_name):
            try:
                os.rename(dst_temp_name, dst)
            except Exception as e:
                print 'Error trying to rename the file: ', e
        else:
            safe_copy(src, dst)
    except Exception as e:
        print 'Error trying to copy the file: ', e

# hash a file and return the hash string
def get_local_hash(filename, block_size=2**20):
    """
    Hashes a file using SHA1
    """
    # print 'Starting hash of %s at %s' % (filename, datetime.now().time())
    print 'Hashing', filename
    try: # koj - use a with statement to manage the file handle
        local_file = open(filename,'rb')
    except Exception as e:
        print 'Error while opening file for hashing: ', e
    hasher = sha256()
    while True:
        data = local_file.read(block_size)
        if not data:
            break
        hasher.update(data)
    # print 'Finished hash at %s' % datetime.now().time()
    result = hasher.hexdigest()
    local_file.close()
    return unicode(result)

def compare_hashes(file1, file2):
    # koj - needs documentation
    if get_local_hash(file1) == get_local_hash(file2):
        return True # koj - sure, but why not just return get_local_hash(file1) == get_local_hash(file2) ?

#main
# koj - did you really need that label?
def main():
    # get list of volumes
    drives = [d for d in os.listdir('/Volumes')]
    # ask user for origin drive
    origin = choose_drive(drives, 'Origin')
    origin_files = os.listdir(origin) # koj - filter here: (orig for orig in os.listdir(origin) if orig.endswith('.mpg'))
    # ask user for destination drive
    destination = choose_drive(drives, 'Destination')
    destination_files = os.listdir(destination)
    # for each file on origin, check if it exists on destination
    for of in origin_files:
        if of.endswith('.mpg'): # koj - see filter on line 76 above, don't need this.
            if of in destination_files:
                if compare_hashes(os.path.join(origin, of), os.path.join(destination, of)):
                    print 'already exists and is identical... skipping', of
                else:
                    print 'already exists but is different... copying', of
                    safe_copy(os.path.join(origin, of), os.path.join(destination, of))
            else:
                print 'does not exists... copying', of
                safe_copy(os.path.join(origin, of), os.path.join(destination, of))

if __name__ == "__main__":

    main()

Code Snippets

#!/usr/bin/env python

"""
Safely copies .mpg files from one volume to another using
temporary filenames and hash verification
"""

import os
from datetime import datetime
from hashlib import sha256
from shutil import copy2

#ask user for origin (where to get files)
def choose_drive(drives, label):
    # koj - needs documentation
    while True:
        for d in drives: # koj - use for d, drive in enumerate(drives)
            print '%s - %s' % (drives.index(d)+1,d) # koj - print '%d - %s' % (d+1,drive)
        drive = raw_input('Select %s: ' % label)
        # koj - cast to int here inside a try/except. (except ValueError, continue).
        if not drive in str(range(0, len(drives)+1)): # koj - Then, no cast to string
            continue
        else:
            return os.path.join('/Volumes', drives[int(drive)-1]) # koj - no need to cast to int anymore

# safe copy
def safe_copy(src, dst):
    # koj - needs documentation
    dst_temp_name = dst+'_INCOMPLETE'
    try:
        copy2(src,dst_temp_name)
        if compare_hashes(src, dst_temp_name):
            try:
                os.rename(dst_temp_name, dst)
            except Exception as e:
                print 'Error trying to rename the file: ', e
        else:
            safe_copy(src, dst)
    except Exception as e:
        print 'Error trying to copy the file: ', e

# hash a file and return the hash string
def get_local_hash(filename, block_size=2**20):
    """
    Hashes a file using SHA1
    """
    # print 'Starting hash of %s at %s' % (filename, datetime.now().time())
    print 'Hashing', filename
    try: # koj - use a with statement to manage the file handle
        local_file = open(filename,'rb')
    except Exception as e:
        print 'Error while opening file for hashing: ', e
    hasher = sha256()
    while True:
        data = local_file.read(block_size)
        if not data:
            break
        hasher.update(data)
    # print 'Finished hash at %s' % datetime.now().time()
    result = hasher.hexdigest()
    local_file.close()
    return unicode(result)

def compare_hashes(file1, file2):
    # koj - needs documentation
    if get_local_hash(file1) == get_local_hash(file2):
        return True # koj - sure, but why not just return get_local_hash(file1) == get_local_hash(file2) ?

#main
# koj - did you really need that label?
def main():
    # get list of volumes
    drives = [d for d in os.listdir('/Volumes')]
    # ask user for origin drive
    origin = choose_drive(drives, 'Origin')
    origin_files = os.listdir(origin) # koj - filter here: (orig for orig in os.listdir(origin) if orig.endswith('.mpg'))
    # ask user for destination drive
    destination = choose_drive(drives, 'Destination')
    destination_files = os.listdir(destination)
    # for each file on origin, check if it exists on destination
    for of in origin_files:
        if of.endswith('.mpg'): # koj - see filter on line 76 above, don't need this.
            if of in destination_files:
             

Context

StackExchange Code Review Q#10584, answer score: 3

Revisions (0)

No revisions yet.