Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 101 additions & 131 deletions bagit.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,95 +173,67 @@ def make_bag(
LOGGER.error(_("Bag directory %s does not exist"), bag_dir)
raise RuntimeError(_("Bag directory %s does not exist") % bag_dir)

# FIXME: we should do the permissions checks before changing directories
old_dir = os.path.abspath(os.path.curdir)

try:
# TODO: These two checks are currently redundant since an unreadable directory will also
# often be unwritable, and this code will require review when we add the option to
# bag to a destination other than the source. It would be nice if we could avoid
# walking the directory tree more than once even if most filesystems will cache it
# An exception is raised if any permissions will prevent the bagging of this path
_check_can_bag(bag_dir)

unbaggable = _can_bag(bag_dir)
LOGGER.info(_("Creating data directory"))

if unbaggable:
LOGGER.error(
_("Unable to write to the following directories and files:\n%s"),
unbaggable,
)
raise BagError(_("Missing permissions to move all files and directories"))

unreadable_dirs, unreadable_files = _can_read(bag_dir)

if unreadable_dirs or unreadable_files:
if unreadable_dirs:
LOGGER.error(
_("The following directories do not have read permissions:\n%s"),
unreadable_dirs,
)
if unreadable_files:
LOGGER.error(
_("The following files do not have read permissions:\n%s"),
unreadable_files,
)
raise BagError(
_("Read permissions are required to calculate file fixities")
)
else:
LOGGER.info(_("Creating data directory"))

# FIXME: if we calculate full paths we won't need to deal with changing directories
os.chdir(bag_dir)
cwd = os.getcwd()
temp_data = tempfile.mkdtemp(dir=cwd)

for f in os.listdir("."):
if os.path.abspath(f) == temp_data:
continue
new_f = os.path.join(temp_data, f)
LOGGER.info(
_("Moving %(source)s to %(destination)s"),
{"source": f, "destination": new_f},
)
os.rename(f, new_f)
# FIXME: if we calculate full paths we won't need to deal with changing directories
os.chdir(bag_dir)
cwd = os.getcwd()
temp_data = tempfile.mkdtemp(dir=cwd)

for f in os.listdir("."):
if os.path.abspath(f) == temp_data:
continue
new_f = os.path.join(temp_data, f)
LOGGER.info(
_("Moving %(source)s to %(destination)s"),
{"source": temp_data, "destination": "data"},
{"source": f, "destination": new_f},
)
os.rename(temp_data, "data")
os.rename(f, new_f)

# permissions for the payload directory should match those of the
# original directory
os.chmod("data", os.stat(cwd).st_mode)
LOGGER.info(
_("Moving %(source)s to %(destination)s"),
{"source": temp_data, "destination": "data"},
)
os.rename(temp_data, "data")

total_bytes, total_files = make_manifests(
"data", processes, algorithms=checksums, encoding=encoding
# permissions for the payload directory should match those of the
# original directory
os.chmod("data", os.stat(cwd).st_mode)

total_bytes, total_files = make_manifests(
"data", processes, algorithms=checksums, encoding=encoding
)

LOGGER.info(_("Creating bagit.txt"))
txt = """BagIt-Version: 0.97\nTag-File-Character-Encoding: UTF-8\n"""
with open_text_file("bagit.txt", "w") as bagit_file:
bagit_file.write(txt)

LOGGER.info(_("Creating bag-info.txt"))
if bag_info is None:
bag_info = {}

# allow 'Bagging-Date' and 'Bag-Software-Agent' to be overidden
if "Bagging-Date" not in bag_info:
bag_info["Bagging-Date"] = date.strftime(date.today(), "%Y-%m-%d")
if "Bag-Software-Agent" not in bag_info:
bag_info["Bag-Software-Agent"] = "bagit.py v%s <%s>" % (
VERSION,
PROJECT_URL,
)

LOGGER.info(_("Creating bagit.txt"))
txt = """BagIt-Version: 0.97\nTag-File-Character-Encoding: UTF-8\n"""
with open_text_file("bagit.txt", "w") as bagit_file:
bagit_file.write(txt)

LOGGER.info(_("Creating bag-info.txt"))
if bag_info is None:
bag_info = {}

# allow 'Bagging-Date' and 'Bag-Software-Agent' to be overidden
if "Bagging-Date" not in bag_info:
bag_info["Bagging-Date"] = date.strftime(date.today(), "%Y-%m-%d")
if "Bag-Software-Agent" not in bag_info:
bag_info["Bag-Software-Agent"] = "bagit.py v%s <%s>" % (
VERSION,
PROJECT_URL,
)
bag_info["Payload-Oxum"] = "%s.%s" % (total_bytes, total_files)
_make_tag_file("bag-info.txt", bag_info)

bag_info["Payload-Oxum"] = "%s.%s" % (total_bytes, total_files)
_make_tag_file("bag-info.txt", bag_info)
for c in checksums:
_make_tagmanifest_file(c, bag_dir, encoding="utf-8")

for c in checksums:
_make_tagmanifest_file(c, bag_dir, encoding="utf-8")
except Exception:
LOGGER.exception(_("An error occurred creating a bag in %s"), bag_dir)
raise
Expand Down Expand Up @@ -472,31 +444,7 @@ def save(self, processes=1, manifests=False):
% self.path
)

unbaggable = _can_bag(self.path)
if unbaggable:
LOGGER.error(
_(
"Missing write permissions for the following directories and files:\n%s"
),
unbaggable,
)
raise BagError(_("Missing permissions to move all files and directories"))

unreadable_dirs, unreadable_files = _can_read(self.path)
if unreadable_dirs or unreadable_files:
if unreadable_dirs:
LOGGER.error(
_("The following directories do not have read permissions:\n%s"),
unreadable_dirs,
)
if unreadable_files:
LOGGER.error(
_("The following files do not have read permissions:\n%s"),
unreadable_files,
)
raise BagError(
_("Read permissions are required to calculate file fixities")
)
_check_can_bag(self.path)

# Change working directory to bag directory so helper functions work
old_dir = os.path.abspath(os.path.curdir)
Expand Down Expand Up @@ -1328,47 +1276,69 @@ def _walk(data_dir):
yield path


def _can_bag(test_dir):
"""Scan the provided directory for files which cannot be bagged due to insufficient permissions"""
unbaggable = []
def _check_can_bag(test_dir):
"""
Scan the provided directory to ensure all files and directories can be read
and all directories can be written.

If there are any permission issues, a BagError is raised.
"""
if not os.access(test_dir, os.R_OK):
# We cannot continue without permission to read the source directory
unbaggable.append(test_dir)
return unbaggable
LOGGER.error(_("Cannot read the source directory %s"), test_dir)
raise BagError(
_(
"Cannot read the source directory %s. Read permissions are "
"required to find files"
)
% test_dir
)

if not os.access(test_dir, os.W_OK):
unbaggable.append(test_dir)
LOGGER.error(_("Cannot write to the source directory %s"), test_dir)
raise BagError(
_(
"Cannot write to the source directory %s. Write permissions "
"are required to move files within the bag"
)
% test_dir
)

for dirpath, dirnames, filenames in os.walk(test_dir):
for directory in dirnames:
full_path = os.path.join(dirpath, directory)
if not os.access(full_path, os.W_OK):
unbaggable.append(full_path)
for dirpath, directories, filenames in os.walk(test_dir, topdown=True):
for filename in filenames:
full_path = os.path.join(dirpath, filename)

return unbaggable
if not os.access(full_path, os.R_OK):
LOGGER.error(_("Cannot read the file %s"), full_path)
raise BagError(
_(
"Cannot read the file %s. Read permissions are "
"required to calculate file fixities"
)
% full_path
)

for directory in directories:
full_path = os.path.join(dirpath, directory)

def _can_read(test_dir):
"""
returns ((unreadable_dirs), (unreadable_files))
"""
unreadable_dirs = []
unreadable_files = []
if not os.access(full_path, os.R_OK):
LOGGER.error(_("Cannot read the sub-directory %s"), full_path)
raise BagError(
_(
"Cannot read the sub-directory %s. Read permissions "
"are required to find files"
)
% full_path
)

if not os.access(test_dir, os.R_OK):
unreadable_dirs.append(test_dir)
else:
for dirpath, dirnames, filenames in os.walk(test_dir):
for dn in dirnames:
full_path = os.path.join(dirpath, dn)
if not os.access(full_path, os.R_OK):
unreadable_dirs.append(full_path)
for fn in filenames:
full_path = os.path.join(dirpath, fn)
if not os.access(full_path, os.R_OK):
unreadable_files.append(full_path)
return (tuple(unreadable_dirs), tuple(unreadable_files))
elif not os.access(full_path, os.W_OK):
LOGGER.error(_("Cannot write to the sub-directory %s"), full_path)
raise BagError(
_(
"Cannot write to the sub-directory %s. Write permissions "
"are required to move files within the bag"
)
% full_path
)


def generate_manifest_lines(filename, algorithms=DEFAULT_CHECKSUMS):
Expand Down
42 changes: 31 additions & 11 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,44 +641,62 @@ def test_make_bag_with_unreadable_source(self):
bagit.make_bag(self.tmpdir, checksum=["sha256"])

self.assertEqual(
"Missing permissions to move all files and directories",
"Cannot read the source directory %s. Read permissions are "
"required to find files" % self.tmpdir,
str(error_catcher.exception),
)

def test_make_bag_with_unreadable_subdirectory(self):
# We'll set this write-only to exercise the second permission check in make_bag:
os.chmod(j(self.tmpdir, "loc"), 0o200)
dir_path = j(self.tmpdir, "loc")
os.chmod(dir_path, 0o200)

with self.assertRaises(bagit.BagError) as error_catcher:
bagit.make_bag(self.tmpdir, checksum=["sha256"])

self.assertEqual(
"Read permissions are required to calculate file fixities",
"Cannot read the sub-directory %s. Read permissions are required "
"to find files" % dir_path,
str(error_catcher.exception),
)

def test_make_bag_with_unwritable_source(self):
path_suffixes = ("", "loc")
# 500 is write + execute
os.chmod(self.tmpdir, 0o500)

for path_suffix in reversed(path_suffixes):
os.chmod(j(self.tmpdir, path_suffix), 0o500)
with self.assertRaises(bagit.BagError) as error_catcher:
bagit.make_bag(self.tmpdir, checksum=["sha256"])

self.assertEqual(
"Cannot write to the source directory %s. Write permissions "
"are required to move files within the bag" % self.tmpdir,
str(error_catcher.exception),
)

def test_make_bag_with_unwritable_subdirectory(self):
# 400 is read-only
dir_path = j(self.tmpdir, "loc")
os.chmod(dir_path, 0o400)

with self.assertRaises(bagit.BagError) as error_catcher:
bagit.make_bag(self.tmpdir, checksum=["sha256"])

self.assertEqual(
"Missing permissions to move all files and directories",
"Cannot write to the sub-directory %s. Write permissions are "
"required to move files within the bag" % dir_path,
str(error_catcher.exception),
)

def test_make_bag_with_unreadable_file(self):
os.chmod(j(self.tmpdir, "loc", "2478433644_2839c5e8b8_o_d.jpg"), 0)
file_path = j(self.tmpdir, "loc", "2478433644_2839c5e8b8_o_d.jpg")
os.chmod(file_path, 0)

with self.assertRaises(bagit.BagError) as error_catcher:
bagit.make_bag(self.tmpdir, checksum=["sha256"])

self.assertEqual(
"Read permissions are required to calculate file fixities",
"Cannot read the file %s. Read permissions are required to "
"calculate file fixities" % file_path,
str(error_catcher.exception),
)

Expand Down Expand Up @@ -834,13 +852,15 @@ def test_save_bag_to_unwritable_directory(self):
def test_save_bag_with_unwritable_file(self):
bag = bagit.make_bag(self.tmpdir, checksum=["sha256"])

os.chmod(os.path.join(self.tmpdir, "bag-info.txt"), 0)
file_path = j(self.tmpdir, "bag-info.txt")
os.chmod(file_path, 0)

with self.assertRaises(bagit.BagError) as error_catcher:
bag.save()

self.assertEqual(
"Read permissions are required to calculate file fixities",
"Cannot read the file %s. Read permissions are required to "
"calculate file fixities" % file_path,
str(error_catcher.exception),
)

Expand Down