patternpythonMinor
Robust .mpg file copier
Viewed 0 times
copierfilempgrobust
Problem
```
#!/usr/bin/env python
"""
Safely copies .mpg files from one volume to another using
temporary filenames and hash verification
"""
import os
from datetime import datetime
from hashlib import sha256
from shutil import copy2
#ask user for origin (where to get files)
def choose_drive(drives, label):
while True:
for d in drives:
print '%s - %s' % (drives.index(d)+1,d)
drive = raw_input('Select %s: ' % label)
if not drive in str(range(0, len(drives)+1)):
continue
else:
return os.path.join('/Volumes', drives[int(drive)-1])
# safe copy
def safe_copy(src, dst):
dst_temp_name = dst+'_INCOMPLETE'
try:
copy2(src,dst_temp_name)
if compare_hashes(src, dst_temp_name):
try:
os.rename(dst_temp_name, dst)
except Exception as e:
print 'Error trying to rename the file: ', e
else:
safe_copy(src, dst)
except Exception as e:
print 'Error trying to copy the file: ', e
# hash a file and return the hash string
def get_local_hash(filename, block_size=2**20):
"""
Hashes a file using SHA1
"""
# print 'Starting hash of %s at %s' % (filename, datetime.now().time())
print 'Hashing', filename
try:
local_file = open(filename,'rb')
except Exception as e:
print 'Error while opening file for hashing: ', e
hasher = sha256()
while True:
data = local_file.read(block_size)
if not data:
break
hasher.update(data)
# print 'Finished hash at %s' % datetime.now().time()
result = hasher.hexdigest()
local_file.close()
return unicode(result)
def compare_hashes(file1, file2):
if get_local_hash(file1) == get_local_hash(file2):
return True
#main
def main():
# get list of volumes
drives = [d for d in os.listdir('/Volumes')]
# ask user for origin drive
origin = choose_drive(drives, 'Origin')
ori
#!/usr/bin/env python
"""
Safely copies .mpg files from one volume to another using
temporary filenames and hash verification
"""
import os
from datetime import datetime
from hashlib import sha256
from shutil import copy2
#ask user for origin (where to get files)
def choose_drive(drives, label):
while True:
for d in drives:
print '%s - %s' % (drives.index(d)+1,d)
drive = raw_input('Select %s: ' % label)
if not drive in str(range(0, len(drives)+1)):
continue
else:
return os.path.join('/Volumes', drives[int(drive)-1])
# safe copy
def safe_copy(src, dst):
dst_temp_name = dst+'_INCOMPLETE'
try:
copy2(src,dst_temp_name)
if compare_hashes(src, dst_temp_name):
try:
os.rename(dst_temp_name, dst)
except Exception as e:
print 'Error trying to rename the file: ', e
else:
safe_copy(src, dst)
except Exception as e:
print 'Error trying to copy the file: ', e
# hash a file and return the hash string
def get_local_hash(filename, block_size=2**20):
"""
Hashes a file using SHA1
"""
# print 'Starting hash of %s at %s' % (filename, datetime.now().time())
print 'Hashing', filename
try:
local_file = open(filename,'rb')
except Exception as e:
print 'Error while opening file for hashing: ', e
hasher = sha256()
while True:
data = local_file.read(block_size)
if not data:
break
hasher.update(data)
# print 'Finished hash at %s' % datetime.now().time()
result = hasher.hexdigest()
local_file.close()
return unicode(result)
def compare_hashes(file1, file2):
if get_local_hash(file1) == get_local_hash(file2):
return True
#main
def main():
# get list of volumes
drives = [d for d in os.listdir('/Volumes')]
# ask user for origin drive
origin = choose_drive(drives, 'Origin')
ori
Solution
Responses in comments starting with koj -.
#!/usr/bin/env python
"""
Safely copies .mpg files from one volume to another using
temporary filenames and hash verification
"""
import os
from datetime import datetime
from hashlib import sha256
from shutil import copy2
#ask user for origin (where to get files)
def choose_drive(drives, label):
# koj - needs documentation
while True:
for d in drives: # koj - use for d, drive in enumerate(drives)
print '%s - %s' % (drives.index(d)+1,d) # koj - print '%d - %s' % (d+1,drive)
drive = raw_input('Select %s: ' % label)
# koj - cast to int here inside a try/except. (except ValueError, continue).
if not drive in str(range(0, len(drives)+1)): # koj - Then, no cast to string
continue
else:
return os.path.join('/Volumes', drives[int(drive)-1]) # koj - no need to cast to int anymore
# safe copy
def safe_copy(src, dst):
# koj - needs documentation
dst_temp_name = dst+'_INCOMPLETE'
try:
copy2(src,dst_temp_name)
if compare_hashes(src, dst_temp_name):
try:
os.rename(dst_temp_name, dst)
except Exception as e:
print 'Error trying to rename the file: ', e
else:
safe_copy(src, dst)
except Exception as e:
print 'Error trying to copy the file: ', e
# hash a file and return the hash string
def get_local_hash(filename, block_size=2**20):
"""
Hashes a file using SHA1
"""
# print 'Starting hash of %s at %s' % (filename, datetime.now().time())
print 'Hashing', filename
try: # koj - use a with statement to manage the file handle
local_file = open(filename,'rb')
except Exception as e:
print 'Error while opening file for hashing: ', e
hasher = sha256()
while True:
data = local_file.read(block_size)
if not data:
break
hasher.update(data)
# print 'Finished hash at %s' % datetime.now().time()
result = hasher.hexdigest()
local_file.close()
return unicode(result)
def compare_hashes(file1, file2):
# koj - needs documentation
if get_local_hash(file1) == get_local_hash(file2):
return True # koj - sure, but why not just return get_local_hash(file1) == get_local_hash(file2) ?
#main
# koj - did you really need that label?
def main():
# get list of volumes
drives = [d for d in os.listdir('/Volumes')]
# ask user for origin drive
origin = choose_drive(drives, 'Origin')
origin_files = os.listdir(origin) # koj - filter here: (orig for orig in os.listdir(origin) if orig.endswith('.mpg'))
# ask user for destination drive
destination = choose_drive(drives, 'Destination')
destination_files = os.listdir(destination)
# for each file on origin, check if it exists on destination
for of in origin_files:
if of.endswith('.mpg'): # koj - see filter on line 76 above, don't need this.
if of in destination_files:
if compare_hashes(os.path.join(origin, of), os.path.join(destination, of)):
print 'already exists and is identical... skipping', of
else:
print 'already exists but is different... copying', of
safe_copy(os.path.join(origin, of), os.path.join(destination, of))
else:
print 'does not exists... copying', of
safe_copy(os.path.join(origin, of), os.path.join(destination, of))
if __name__ == "__main__":
main()Code Snippets
#!/usr/bin/env python
"""
Safely copies .mpg files from one volume to another using
temporary filenames and hash verification
"""
import os
from datetime import datetime
from hashlib import sha256
from shutil import copy2
#ask user for origin (where to get files)
def choose_drive(drives, label):
# koj - needs documentation
while True:
for d in drives: # koj - use for d, drive in enumerate(drives)
print '%s - %s' % (drives.index(d)+1,d) # koj - print '%d - %s' % (d+1,drive)
drive = raw_input('Select %s: ' % label)
# koj - cast to int here inside a try/except. (except ValueError, continue).
if not drive in str(range(0, len(drives)+1)): # koj - Then, no cast to string
continue
else:
return os.path.join('/Volumes', drives[int(drive)-1]) # koj - no need to cast to int anymore
# safe copy
def safe_copy(src, dst):
# koj - needs documentation
dst_temp_name = dst+'_INCOMPLETE'
try:
copy2(src,dst_temp_name)
if compare_hashes(src, dst_temp_name):
try:
os.rename(dst_temp_name, dst)
except Exception as e:
print 'Error trying to rename the file: ', e
else:
safe_copy(src, dst)
except Exception as e:
print 'Error trying to copy the file: ', e
# hash a file and return the hash string
def get_local_hash(filename, block_size=2**20):
"""
Hashes a file using SHA1
"""
# print 'Starting hash of %s at %s' % (filename, datetime.now().time())
print 'Hashing', filename
try: # koj - use a with statement to manage the file handle
local_file = open(filename,'rb')
except Exception as e:
print 'Error while opening file for hashing: ', e
hasher = sha256()
while True:
data = local_file.read(block_size)
if not data:
break
hasher.update(data)
# print 'Finished hash at %s' % datetime.now().time()
result = hasher.hexdigest()
local_file.close()
return unicode(result)
def compare_hashes(file1, file2):
# koj - needs documentation
if get_local_hash(file1) == get_local_hash(file2):
return True # koj - sure, but why not just return get_local_hash(file1) == get_local_hash(file2) ?
#main
# koj - did you really need that label?
def main():
# get list of volumes
drives = [d for d in os.listdir('/Volumes')]
# ask user for origin drive
origin = choose_drive(drives, 'Origin')
origin_files = os.listdir(origin) # koj - filter here: (orig for orig in os.listdir(origin) if orig.endswith('.mpg'))
# ask user for destination drive
destination = choose_drive(drives, 'Destination')
destination_files = os.listdir(destination)
# for each file on origin, check if it exists on destination
for of in origin_files:
if of.endswith('.mpg'): # koj - see filter on line 76 above, don't need this.
if of in destination_files:
Context
StackExchange Code Review Q#10584, answer score: 3
Revisions (0)
No revisions yet.