diff --git a/.gitignore b/.gitignore index 6b483114..4e17c966 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ htmlcov deps venv .vscode/settings.json +debug.py diff --git a/mergin/client.py b/mergin/client.py index 101c62e1..88b03231 100644 --- a/mergin/client.py +++ b/mergin/client.py @@ -17,6 +17,15 @@ import typing import warnings +from mergin.models import ( + ProjectDelta, + ProjectDeltaItemDiff, + ProjectDeltaItem, + ProjectResponse, + ProjectFile, + ProjectWorkspace, +) + from .common import ( ClientError, LoginError, @@ -119,6 +128,7 @@ def __init__( self._user_info = None self._server_type = None self._server_version = None + self._server_features = {} self.client_version = "Python-client/" + __version__ if plugin_version is not None: # this could be e.g. "Plugin/2020.1 QGIS/3.14" self.client_version += " " + plugin_version @@ -309,6 +319,13 @@ def delete(self, path, validate_auth=True): request = urllib.request.Request(url, method="DELETE") return self._do_request(request, validate_auth=validate_auth) + def head(self, path, data=None, headers={}, validate_auth=True): + url = urllib.parse.urljoin(self.url, urllib.parse.quote(path)) + if data: + url += "?" + urllib.parse.urlencode(data) + request = urllib.request.Request(url, headers=headers, method="HEAD") + return self._do_request(request, validate_auth=validate_auth) + def login(self, login, password): """ Authenticate login credentials and store session token @@ -412,6 +429,19 @@ def server_version(self): return self._server_version + def server_features(self): + """ + Returns feature flags of the server. + """ + if self._server_features: + return self._server_features + config = self.server_config() + self._server_features = { + "v2_push_enabled": config.get("v2_push_enabled", False), + "v2_pull_enabled": config.get("v2_pull_enabled", False), + } + return self._server_features + def workspaces_list(self): """ Find all available workspaces @@ -699,6 +729,90 @@ def project_info(self, project_path_or_id, since=None, version=None): resp = self.get("/v1/project/{}".format(project_path_or_id), params) return json.load(resp) + def project_info_v2(self, project_id: str, files_at_version=None) -> ProjectResponse: + """ + Fetch info about project. + + :param project_id: Project's id + :type project_id: String + :param files_at_version: Version to track files at given version + :type files_at_version: String + """ + self.check_v2_project_info_support() + + params = {} + if files_at_version: + params = {"files_at_version": files_at_version} + resp = self.get(f"/v2/projects/{project_id}", params) + resp_json = json.load(resp) + project_workspace = resp_json.get("workspace", {}) + return ProjectResponse( + id=resp_json.get("id"), + name=resp_json.get("name"), + created_at=resp_json.get("created_at"), + updated_at=resp_json.get("updated_at"), + version=resp_json.get("version"), + public=resp_json.get("public"), + role=resp_json.get("role"), + size=resp_json.get("size"), + workspace=ProjectWorkspace( + id=project_workspace.get("id"), + name=project_workspace.get("name"), + ), + files=[ + ProjectFile( + checksum=f.get("checksum"), + mtime=f.get("mtime"), + path=f.get("path"), + size=f.get("size"), + ) + for f in resp_json.get("files", []) + ], + ) + + def get_project_delta(self, project_id: str, since: str, to: typing.Optional[str] = None) -> ProjectDelta: + """ + Fetch info about project delta since given version. + + :param project_id: Project's id + :type project_id: String + :param since: Version to track history of files from + :type since: String + :param to: Optional version to track history of files to, if not given latest version is used + :type since: String + :rtype: Dict + """ + # If it is not enabled on the server, raise error + if not self.server_features().get("v2_pull_enabled", False): + raise ClientError("Project delta is not supported by the server") + + params = {"since": since} + if to: + params["to"] = to + resp = self.get(f"/v2/projects/{project_id}/delta", params) + resp_parsed = json.load(resp) + return ProjectDelta( + to_version=resp_parsed.get("to_version"), + items=[ + ProjectDeltaItem( + path=item["path"], + size=item.get("size"), + checksum=item.get("checksum"), + version=item.get("version"), + change=item.get("change"), + diffs=( + [ + ProjectDeltaItemDiff( + id=diff.get("id"), + ) + for diff in item.get("diffs", []) + ] + ), + ) + for item in resp_parsed.get("items", []) + ], + ) + def paginated_project_versions(self, project_path, page, per_page=100, descending=False): """ Get records of project's versions (history) using calculated pagination. @@ -789,11 +903,11 @@ def download_project(self, project_path, directory, version=None): :param project_path: Project's full name (/) :type project_path: String - :param version: Project version to download, e.g. v42 - :type version: String - :param directory: Target directory :type directory: String + + :param version: Project version to download, e.g. v42 + :type version: String """ job = download_project_async(self, project_path, directory, version) download_project_wait(job) @@ -1308,13 +1422,21 @@ def check_collaborators_members_support(self): if not is_version_acceptable(self.server_version(), f"{min_version}"): raise NotImplementedError(f"This needs server at version {min_version} or later") + def check_v2_project_info_support(self): + """ + Check if the server is compatible with v2 endpoint for project info + """ + min_version = "2025.8.2" + if not is_version_acceptable(self.server_version(), f"{min_version}"): + raise NotImplementedError(f"This needs server at version {min_version} or later") + def create_user( self, email: str, password: str, workspace_id: int, workspace_role: WorkspaceRole, - username: str = None, + username: typing.Optional[str] = None, notify_user: bool = False, ) -> dict: """ diff --git a/mergin/client_pull.py b/mergin/client_pull.py index 264f4708..a84e16dc 100644 --- a/mergin/client_pull.py +++ b/mergin/client_pull.py @@ -17,12 +17,16 @@ import tempfile import typing import traceback +from dataclasses import asdict import concurrent.futures -from .common import CHUNK_SIZE, ClientError + +from .common import CHUNK_SIZE, ClientError, DeltaChangeType, PullActionType +from .models import ProjectDelta, ProjectDeltaItem, PullAction from .merginproject import MerginProject -from .utils import cleanup_tmp_dir, save_to_file +from .utils import cleanup_tmp_dir, int_version, save_to_file +from typing import List, Optional, Tuple # status = download_project_async(...) @@ -76,19 +80,80 @@ def dump(self): print("--- END ---") -def _download_items(file, directory, diff_only=False): +class DownloadDiffQueueItem: + """Download item representing a diff file to be downloaded using v2 diff/raw endpoint""" + + def __init__(self, file_path, download_file_path): + self.file_path = file_path # relative path to the file within project + self.download_file_path = download_file_path # full path to a temporary file which will receive the content + self.size = 0 # size of the item in bytes + + def __repr__(self): + return f"" + + def download_blocking(self, mc, mp): + """Starts download and only returns once the file has been fully downloaded and saved""" + + mp.log.debug(f"Downloading diff {self.file_path}") + resp = mc.get( + f"/v2/projects/{mp.project_id()}/raw/diff/{self.file_path}", + ) + if resp.status in [200, 206]: + mp.log.debug(f"Download finished: {self.file_path}") + save_to_file(resp, self.download_file_path) + self.size = os.path.getsize(self.download_file_path) + else: + mp.log.error(f"Download failed: {self.file_path}") + raise ClientError(f"Failed to download of diff file {self.file_path} to {self.download_file_path}") + + +class FileToMerge: + """ + Keeps information about how to create a file (path specified by dest_file) from a couple + of downloaded items (chunks) - each item is DownloadQueueItem object which has path + to the temporary file containing its data. Calling merge() will create the destination file + and remove the temporary files of the chunks + """ + + def __init__(self, dest_file, downloaded_items, size_check=True): + self.dest_file = dest_file # full path to the destination file to be created + self.downloaded_items = downloaded_items # list of pieces of the destination file to be merged + self.size_check = size_check # whether we want to do merged file size check + + def merge(self): + with open(self.dest_file, "wb") as final: + for item in self.downloaded_items: + with open(item.download_file_path, "rb") as chunk: + shutil.copyfileobj(chunk, final) + os.remove(item.download_file_path) + + if not self.size_check: + return + expected_size = sum(item.size for item in self.downloaded_items) + if os.path.getsize(self.dest_file) != expected_size: + os.remove(self.dest_file) + raise ClientError("Download of file {} failed. Please try it again.".format(self.dest_file)) + + +def get_download_items( + file_path: str, + file_size: int, + file_version: str, + download_directory: str, + download_path: Optional[str] = None, + diff_only=False, +): """Returns an array of download queue items""" - file_dir = os.path.dirname(os.path.normpath(os.path.join(directory, file["path"]))) - basename = os.path.basename(file["diff"]["path"]) if diff_only else os.path.basename(file["path"]) - file_size = file["diff"]["size"] if diff_only else file["size"] + file_dir = os.path.dirname(os.path.normpath(os.path.join(download_directory, file_path))) + basename = os.path.basename(download_path) if download_path else os.path.basename(file_path) chunks = math.ceil(file_size / CHUNK_SIZE) items = [] for part_index in range(chunks): download_file_path = os.path.join(file_dir, basename + ".{}".format(part_index)) size = min(CHUNK_SIZE, file_size - part_index * CHUNK_SIZE) - items.append(DownloadQueueItem(file["path"], size, file["version"], diff_only, part_index, download_file_path)) + items.append(DownloadQueueItem(file_path, size, file_version, diff_only, part_index, download_file_path)) return items @@ -100,7 +165,10 @@ def _do_download(item, mc, mp, project_path, job): # TODO: make download_blocking / save_to_file cancellable so that we can cancel as soon as possible - item.download_blocking(mc, mp, project_path) + if isinstance(item, DownloadDiffQueueItem): + item.download_blocking(mc, mp) + else: + item.download_blocking(mc, mp, project_path) job.transferred_size += item.size @@ -167,7 +235,7 @@ def download_project_async(mc, project_path, directory, project_version=None): update_tasks = [] # stuff to do at the end of download for file in project_info["files"]: file["version"] = version - items = _download_items(file, tmp_dir.name) + items = get_download_items(file.get("path"), file.get("size"), file.get("version"), tmp_dir.name) is_latest_version = project_version == latest_proj_info["version"] update_tasks.append(UpdateTask(file["path"], items, latest_version=is_latest_version)) @@ -343,7 +411,7 @@ class PullJob: def __init__( self, project_path, - pull_changes, + pull_actions, total_size, version, files_to_merge, @@ -355,8 +423,8 @@ def __init__( mc, ): self.project_path = project_path - self.pull_changes = ( - pull_changes # dictionary with changes (dict[str, list[dict]] - keys: "added", "updated", ...) + self.pull_actions: Optional[List[PullAction]] = ( + pull_actions # dictionary with changes (dict[str, list[dict]] - keys: "added", "updated", ...) ) self.total_size = total_size # size of data to download (in bytes) self.transferred_size = 0 @@ -364,13 +432,15 @@ def __init__( self.files_to_merge = files_to_merge # list of FileToMerge instances self.download_queue_items = download_queue_items self.tmp_dir = tmp_dir # TemporaryDirectory instance where we store downloaded files - self.mp = mp # MerginProject instance + self.mp: MerginProject = mp # MerginProject instance self.is_cancelled = False self.project_info = project_info # parsed JSON with project info returned from the server self.basefiles_to_patch = ( basefiles_to_patch # list of tuples (relative path within project, list of diff files in temp dir to apply) ) self.mc = mc + self.futures = [] # list of concurrent.futures.Future instances + self.v2_pull = mc.server_features().get("v2_pull_enabled", False) def dump(self): print("--- JOB ---", self.total_size, "bytes") @@ -385,7 +455,31 @@ def dump(self): print("--- END ---") -def pull_project_async(mc, directory): +def prepare_file_destination(target_dir: str, path: str) -> str: + """Prepares destination path for downloaded files chunks""" + + # figure out destination path for the file + file_dir = os.path.dirname(os.path.normpath(os.path.join(target_dir, path))) + basename = os.path.basename(path) + dest_file_path = os.path.join(file_dir, basename) + os.makedirs(file_dir, exist_ok=True) + return dest_file_path + + +def get_diff_merge_files(delta_item: ProjectDeltaItem, target_dir: str) -> List[FileToMerge]: + """ + Extracts list of diff files to be downloaded from delta item using v1 endpoint. + """ + result = [] + + for diff in delta_item.diffs: + dest_file_path = prepare_file_destination(target_dir, diff.id) + download_items = get_download_items(delta_item.path, diff.size, diff.version, target_dir, diff.id, True) + result.append(FileToMerge(dest_file_path, download_items)) + return result + + +def pull_project_async(mc, directory) -> Optional[PullJob]: """ Starts project pull in background and returns handle to the pending job. Using that object it is possible to watch progress or cancel the ongoing work. @@ -401,106 +495,121 @@ def pull_project_async(mc, directory): raise project_path = mp.project_full_name() + project_id = mp.project_id() local_version = mp.version() mp.log.info("--- version: " + mc.user_agent_info()) mp.log.info(f"--- start pull {project_path}") + delta = None + server_info = None + server_version = None + v2_pull = mc.server_features().get("v2_pull_enabled", False) try: - server_info = mc.project_info(project_path, since=local_version) + if v2_pull: + delta: ProjectDelta = mc.get_project_delta(project_id, since=local_version) + server_version = delta.to_version + else: + server_info = mc.project_info(project_path, since=local_version) + server_version = server_info.get("version") + delta = mp.get_pull_delta(server_info) except ClientError as err: mp.log.error("Error getting project info: " + str(err)) mp.log.info("--- pull aborted") raise - server_version = server_info["version"] - mp.log.info(f"got project info: local version {local_version} / server version {server_version}") + if not delta.items: + mp.log.info("--- pull - nothing to do (no delta changes detected)") + return # Project is up to date if local_version == server_version: mp.log.info("--- pull - nothing to do (already at server version)") return # Project is up to date - - # we either download a versioned file using diffs (strongly preferred), - # but if we don't have history with diffs (e.g. uploaded without diffs) - # then we just download the whole file - _pulling_file_with_diffs = lambda f: "diffs" in f and len(f["diffs"]) != 0 + mp.log.info(f"got project versions: local version {local_version} / server version {server_version}") tmp_dir = tempfile.TemporaryDirectory(prefix="mm-pull-") - pull_changes = mp.get_pull_changes(server_info["files"]) - mp.log.debug("pull changes:\n" + pprint.pformat(pull_changes)) - fetch_files = [] - for f in pull_changes["added"]: - f["version"] = server_version - fetch_files.append(f) - # extend fetch files download list with various version of diff files (if needed) - for f in pull_changes["updated"]: - if _pulling_file_with_diffs(f): - for diff in f["diffs"]: - diff_file = copy.deepcopy(f) - for k, v in f["history"].items(): - if "diff" not in v: - continue - if diff == v["diff"]["path"]: - diff_file["version"] = k - diff_file["diff"] = v["diff"] - fetch_files.append(diff_file) - else: - f["version"] = server_version - fetch_files.append(f) - - files_to_merge = [] # list of FileToMerge instances - - for file in fetch_files: - diff_only = _pulling_file_with_diffs(file) - items = _download_items(file, tmp_dir.name, diff_only) - - # figure out destination path for the file - file_dir = os.path.dirname(os.path.normpath(os.path.join(tmp_dir.name, file["path"]))) - basename = os.path.basename(file["diff"]["path"]) if diff_only else os.path.basename(file["path"]) - dest_file_path = os.path.join(file_dir, basename) - os.makedirs(file_dir, exist_ok=True) - files_to_merge.append(FileToMerge(dest_file_path, items)) - - # make sure we can update geodiff reference files (aka. basefiles) with diffs or - # download their full versions so we have them up-to-date for applying changes + # list of FileToMerge instances, which consists of destination path and list of download items + merge_files = [] + diff_files = [] basefiles_to_patch = [] # list of tuples (relative path within project, list of diff files in temp dir to apply) - for file in pull_changes["updated"]: - if not _pulling_file_with_diffs(file): - continue # this is only for diffable files (e.g. geopackages) - - basefile = mp.fpath_meta(file["path"]) - if not os.path.exists(basefile): - # The basefile does not exist for some reason. This should not happen normally (maybe user removed the file - # or we removed it within previous pull because we failed to apply patch the older version for some reason). - # But it's not a problem - we will download the newest version and we're sorted. - file_path = file["path"] - mp.log.info(f"missing base file for {file_path} -> going to download it (version {server_version})") - file["version"] = server_version - items = _download_items(file, tmp_dir.name, diff_only=False) - dest_file_path = mp.fpath(file["path"], tmp_dir.name) - # dest_file_path = os.path.join(os.path.dirname(os.path.normpath(os.path.join(temp_dir, file['path']))), os.path.basename(file['path'])) - files_to_merge.append(FileToMerge(dest_file_path, items)) - continue - - basefiles_to_patch.append((file["path"], file["diffs"])) + pull_actions = [] + local_delta = mp.get_local_delta(tmp_dir.name) + # Converting local to PullActions + for item in delta.items: + # find corresponding local delta item + local_item = next((i for i in local_delta if i.path == item.path), None) + local_item_change = local_item.change if local_item else None + + # compare server and local changes to decide what to do in pull + pull_action_type = mp.get_pull_action(item.change, local_item_change) + if not pull_action_type: + continue # no action needed + + pull_action = PullAction(pull_action_type, item, local_item) + if pull_action_type == PullActionType.APPLY_DIFF or ( + pull_action_type == PullActionType.COPY_CONFLICT and item.change == DeltaChangeType.UPDATE_DIFF + ): + basefile = mp.fpath_meta(item.path) + if not os.path.exists(basefile): + # The basefile does not exist for some reason. This should not happen normally (maybe user removed the file + # or we removed it within previous pull because we failed to apply patch the older version for some reason). + # But it's not a problem - we will download the newest version and we're sorted. + mp.log.info(f"missing base file for {item.path} -> going to download it (version {server_version})") + items = get_download_items(item.path, item.size, server_version, tmp_dir.name) + dest_file_path = mp.fpath(item.path, tmp_dir.name) + merge_files.append(FileToMerge(dest_file_path, items)) + + # Force use COPY action to apply the new version instead of trying to apply diffs + # We are not able to get local changes anyway as base file is missing + pull_action.type = PullActionType.COPY_CONFLICT + + # if we have diff to apply, let's download the diff files + # if we have conflict and diff update, download the diff files + elif v2_pull: + diff_files.extend( + [ + DownloadDiffQueueItem(diff_item.id, os.path.join(tmp_dir.name, diff_item.id)) + for diff_item in item.diffs + ] + ) + basefiles_to_patch.append((item.path, [diff.id for diff in item.diffs])) + + else: + # fallback for diff files using v1 endpoint /raw + diff_merge_files = get_diff_merge_files(item, tmp_dir.name) + merge_files.extend(diff_merge_files) + basefiles_to_patch.append((item.path, [diff.id for diff in item.diffs])) + + # let's check the base file existence + elif pull_action_type == PullActionType.COPY or pull_action_type == PullActionType.COPY_CONFLICT: + # simply download the server version of the files + dest_file_path = prepare_file_destination(tmp_dir.name, item.path) + download_items = get_download_items(item.path, item.size, server_version, tmp_dir.name) + merge_files.append(FileToMerge(dest_file_path, download_items)) + + pull_actions.append(pull_action) + # Do nothing for DELETE actions # make a single list of items to download total_size = 0 - download_list = [] - for file_to_merge in files_to_merge: - download_list.extend(file_to_merge.downloaded_items) + download_queue_items = [] + for diff_file in diff_files: + download_queue_items.append(diff_file) + total_size += diff_file.size + for file_to_merge in merge_files: + download_queue_items.extend(file_to_merge.downloaded_items) for item in file_to_merge.downloaded_items: total_size += item.size - mp.log.info(f"will download {len(download_list)} chunks, total size {total_size}") + mp.log.info(f"will download {len(download_queue_items)} chunks, total size {total_size}") job = PullJob( project_path, - pull_changes, + pull_actions, total_size, server_version, - files_to_merge, - download_list, + merge_files, + download_queue_items, tmp_dir, mp, server_info, @@ -511,7 +620,7 @@ def pull_project_async(mc, directory): # start download job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) job.futures = [] - for item in download_list: + for item in download_queue_items: future = job.executor.submit(_do_download, item, mc, mp, project_path, job) job.futures.append(future) @@ -524,7 +633,7 @@ def pull_project_wait(job): concurrent.futures.wait(job.futures) -def pull_project_is_running(job): +def pull_project_is_running(job: PullJob): """ Returns true/false depending on whether we have some pending downloads @@ -541,7 +650,7 @@ def pull_project_is_running(job): return False -def pull_project_cancel(job): +def pull_project_cancel(job: PullJob): """ To be called (from main thread) to cancel a job that has downloads in progress. Returns once all background tasks have exited (may block for a bit of time). @@ -554,34 +663,6 @@ def pull_project_cancel(job): cleanup_tmp_dir(job.mp, job.tmp_dir) # delete our temporary dir and all its content -class FileToMerge: - """ - Keeps information about how to create a file (path specified by dest_file) from a couple - of downloaded items (chunks) - each item is DownloadQueueItem object which has path - to the temporary file containing its data. Calling merge() will create the destination file - and remove the temporary files of the chunks - """ - - def __init__(self, dest_file, downloaded_items, size_check=True): - self.dest_file = dest_file # full path to the destination file to be created - self.downloaded_items = downloaded_items # list of pieces of the destination file to be merged - self.size_check = size_check # whether we want to do merged file size check - - def merge(self): - with open(self.dest_file, "wb") as final: - for item in self.downloaded_items: - with open(item.download_file_path, "rb") as chunk: - shutil.copyfileobj(chunk, final) - os.remove(item.download_file_path) - - if not self.size_check: - return - expected_size = sum(item.size for item in self.downloaded_items) - if os.path.getsize(self.dest_file) != expected_size: - os.remove(self.dest_file) - raise ClientError("Download of file {} failed. Please try it again.".format(self.dest_file)) - - def pull_project_finalize(job: PullJob): """ To be called when pull in the background is finished and we need to do the finalization (merge chunks etc.) @@ -602,6 +683,14 @@ def pull_project_finalize(job: PullJob): raise future.exception() job.mp.log.info("finalizing pull") + if not job.project_info and job.v2_pull: + project_info_response = job.mc.project_info(job.project_path, version=job.version) + job.project_info = asdict(project_info_response) + + if not job.project_info: + job.mp.log.error("No project info available to finalize pull") + job.mp.log.info("--- pull aborted") + raise ClientError("No project info available to finalize pull") # merge downloaded chunks try: @@ -618,7 +707,7 @@ def pull_project_finalize(job: PullJob): basefile = job.mp.fpath_meta(file_path) server_file = job.mp.fpath(file_path, job.tmp_dir.name) - shutil.copy(basefile, server_file) + job.mp.geodiff.make_copy_sqlite(basefile, server_file) diffs = [job.mp.fpath(f, job.tmp_dir.name) for f in file_diffs] patch_error = job.mp.apply_diffs(server_file, diffs) if patch_error: @@ -633,14 +722,16 @@ def pull_project_finalize(job: PullJob): job.mp.log.info("--- pull aborted") os.remove(basefile) raise ClientError("Cannot patch basefile {}! Please try syncing again.".format(basefile)) - + conflicts = [] + job.mp.log.info(f"--- applying pull actions {job.pull_actions}") try: - conflicts = job.mp.apply_pull_changes(job.pull_changes, job.tmp_dir.name, job.project_info, job.mc) + if job.pull_actions: + conflicts = job.mp.apply_pull_actions(job.pull_actions, job.tmp_dir.name, job.project_info, job.mc) except Exception as e: - job.mp.log.error("Failed to apply pull changes: " + str(e)) + job.mp.log.error("Failed to apply pull actions: " + str(e)) job.mp.log.info("--- pull aborted") cleanup_tmp_dir(job.mp, job.tmp_dir) # delete our temporary dir and all its content - raise ClientError("Failed to apply pull changes: " + str(e)) + raise ClientError("Failed to apply pull actions: " + str(e)) job.mp.update_metadata(job.project_info) @@ -714,8 +805,16 @@ def download_diffs_async(mc, project_directory, file_path, versions): download_list = [] # list of all items to be downloaded total_size = 0 for file in fetch_files: - items = _download_items(file, mp.cache_dir, diff_only=True) - dest_file_path = mp.fpath_cache(file["diff"]["path"], version=file["version"]) + diff = file.get("diff") + items = get_download_items( + file.get("path"), + diff["size"], + file["version"], + mp.cache_dir, + download_path=diff.get("path"), + diff_only=True, + ) + dest_file_path = mp.fpath_cache(diff["path"], version=file["version"]) if os.path.exists(dest_file_path): continue files_to_merge.append(FileToMerge(dest_file_path, items)) @@ -825,7 +924,7 @@ def download_files_async( if file["path"] in file_paths: index = file_paths.index(file["path"]) file["version"] = version - items = _download_items(file, tmp_dir) + items = get_download_items(file["path"], file["size"], version, mp.cache_dir) is_latest_version = version == latest_proj_info["version"] task = UpdateTask(file["path"], items, output_paths[index], latest_version=is_latest_version) download_list.extend(task.download_queue_items) diff --git a/mergin/common.py b/mergin/common.py index b27c95ec..d5f79dff 100644 --- a/mergin/common.py +++ b/mergin/common.py @@ -1,5 +1,7 @@ import os from enum import Enum +from typing import List, Dict, Any +from dataclasses import dataclass, field CHUNK_SIZE = 100 * 1024 * 1024 @@ -94,3 +96,17 @@ class ProjectRole(Enum): EDITOR = "editor" WRITER = "writer" OWNER = "owner" + + +class DeltaChangeType(Enum): + CREATE = "create" + UPDATE = "update" + DELETE = "delete" + UPDATE_DIFF = "update_diff" + + +class PullActionType(Enum): + COPY = "copy" + COPY_CONFLICT = "copy_conflict" + APPLY_DIFF = "apply_diff" + DELETE = "delete" diff --git a/mergin/merginproject.py b/mergin/merginproject.py index 90136516..90277d06 100644 --- a/mergin/merginproject.py +++ b/mergin/merginproject.py @@ -4,14 +4,17 @@ import os import re import shutil +from typing import List, Dict, Any, Optional import uuid import tempfile from datetime import datetime from dateutil.tz import tzlocal +from dataclasses import asdict from .editor import prevent_conflicted_copy -from .common import UPLOAD_CHUNK_SIZE, InvalidProject, ClientError +from .common import UPLOAD_CHUNK_SIZE, DeltaChangeType, InvalidProject, ClientError, PullActionType +from .models import ProjectDelta, ProjectDeltaItem, ProjectDeltaItemDiff, PullAction from .utils import ( generate_checksum, is_versioned_file, @@ -141,9 +144,11 @@ def project_full_name(self) -> str: """Returns fully qualified project name: /""" self._read_metadata() if self.is_old_metadata: - return self._metadata["name"] - else: - return f"{self._metadata['namespace']}/{self._metadata['name']}" + return self._metadata.get("name") + workspace = self._metadata.get("workspace", {}) + if workspace: + return f"{workspace.get('name')}/{self._metadata['name']}" + return f"{self._metadata.get('namespace')}/{self._metadata['name']}" def project_name(self) -> str: """Returns only project name, without its workspace name""" @@ -189,7 +194,10 @@ def workspace_id(self) -> int: "The project directory has been created with an old version of the Mergin Maps client. " "Please delete the project directory and re-download the project." ) - return self._metadata["workspace_id"] + workspace = self._metadata.get("workspace", {}) + if workspace: + return workspace.get("id") + return self._metadata.get("workspace_id") def version(self) -> str: """Returns project version (e.g. "v123")""" @@ -349,13 +357,12 @@ def compare_file_sets(self, origin, current): return {"renamed": [], "added": added, "removed": removed, "updated": updated} - def get_pull_changes(self, server_files): + def get_pull_changes(self, server_files: List[dict]) -> dict: """ Calculate changes needed to be pulled from server. Calculate diffs between local files metadata and server's ones. Because simple metadata like file size or checksum are not enough to determine geodiff files changes, evaluate also their history (provided by server). - For small files ask for full versions of geodiff files, otherwise determine list of diffs needed to update file. .. seealso:: self.compare_file_sets @@ -405,6 +412,231 @@ def get_pull_changes(self, server_files): changes["updated"] = [f for f in changes["updated"] if f not in not_updated] return changes + def get_pull_delta(self, server_info: dict) -> ProjectDelta: + """ + Calculate delta needed to be pulled from server. + + Calculate diffs between local files metadata and server's ones. Because simple metadata like file size or + checksum are not enough to determine geodiff files changes, evaluate also their history (provided by server). + + This method is similar to get_pull_changes but returns ProjectDelta object instead of raw dict. + + .. seealso:: self.compare_file_sets + .. seealso:: ProjectDelta + .. seealso:: self.get_pull_changes + + :param server_info: project metadata, + self.project_info(project_path, since=v1)) + :type server_info: dict + :returns: changes metadata for files to be pulled from server + :rtype: ProjectDelta + """ + + # first let's have a look at the added/updated/removed files + result = [] + server_files = server_info.get("files", []) + server_version = server_info.get("version", "") + changes = self.compare_file_sets(self.files(), server_files) + + added = changes.get("added", []) + removed = changes.get("removed", []) + updated = changes.get("updated", []) + for change in added: + result.append( + ProjectDeltaItem( + change=DeltaChangeType.CREATE, + path=change["path"], + size=change["size"], + checksum=change["checksum"], + version=server_version, + ) + ) + for change in removed: + result.append( + ProjectDeltaItem( + change=DeltaChangeType.DELETE, + path=change["path"], + size=change["size"], + checksum=change["checksum"], + version=server_version, + ) + ) + + # then let's inspect our versioned files (geopackages) if there are any relevant changes + for change in updated: + path = change.get("path") + if not self.is_versioned_file(path): + result.append( + ProjectDeltaItem( + change=DeltaChangeType.UPDATE, + path=path, + size=change["size"], + checksum=change["checksum"], + version=server_version, + ) + ) + continue + + diffs = [] + is_updated = False + + # get sorted list of the history (they may not be sorted or using lexical sorting - "v10", "v11", "v5", "v6", ...) + history_list = [] + for version_str, version_info in change["history"].items(): + history_list.append((int_version(version_str), version_info)) + history_list = sorted(history_list, key=lambda item: item[0]) # sort tuples based on version numbers + + # need to track geodiff file history to see if there were any changes + for version, version_info in history_list: + if version <= int_version(self.version()): + continue # ignore history of no interest + is_updated = True + diff = version_info.get("diff") + if diff: + size = diff.get("size") + diffs.append(ProjectDeltaItemDiff(id=diff.get("path"), version=f"v{version}", size=size)) + else: + diffs = [] + break # we found force update in history, does not make sense to download diffs + + if is_updated: + result.append( + ProjectDeltaItem( + change=DeltaChangeType.UPDATE_DIFF if diffs else DeltaChangeType.UPDATE, + path=path, + size=change["size"], + checksum=change["checksum"], + version=server_version, + diffs=diffs, + ) + ) + return ProjectDelta(to_version=server_version, items=result) + + def get_pull_action( + self, server_change: DeltaChangeType, local_change: Optional[DeltaChangeType] = None + ) -> Optional[PullActionType]: + """ + Determine pull actions for files by comparing server_change and local_change. + """ + # FATAL combinations + if (server_change, local_change) in [ + (DeltaChangeType.CREATE, DeltaChangeType.UPDATE), + (DeltaChangeType.CREATE, DeltaChangeType.DELETE), + (DeltaChangeType.CREATE, DeltaChangeType.UPDATE_DIFF), + (DeltaChangeType.UPDATE, DeltaChangeType.CREATE), + (DeltaChangeType.UPDATE_DIFF, DeltaChangeType.CREATE), + (DeltaChangeType.DELETE, DeltaChangeType.CREATE), + ]: + self.log.critical(f"Invalid combination of changes: server {server_change}, local {local_change}") + raise ClientError(f"Invalid combination of changes: server {server_change}, local {local_change}") + + pull_action_map = { + (DeltaChangeType.CREATE, None): PullActionType.COPY, + (DeltaChangeType.CREATE, DeltaChangeType.CREATE): PullActionType.COPY_CONFLICT, + (DeltaChangeType.UPDATE, None): PullActionType.COPY, + (DeltaChangeType.UPDATE, DeltaChangeType.UPDATE): PullActionType.COPY_CONFLICT, + (DeltaChangeType.UPDATE, DeltaChangeType.DELETE): PullActionType.COPY, + (DeltaChangeType.UPDATE, DeltaChangeType.UPDATE_DIFF): PullActionType.COPY_CONFLICT, + (DeltaChangeType.UPDATE_DIFF, None): PullActionType.APPLY_DIFF, # without rebase + (DeltaChangeType.UPDATE_DIFF, DeltaChangeType.UPDATE): PullActionType.COPY_CONFLICT, + (DeltaChangeType.UPDATE_DIFF, DeltaChangeType.DELETE): PullActionType.COPY, + (DeltaChangeType.UPDATE_DIFF, DeltaChangeType.UPDATE_DIFF): PullActionType.APPLY_DIFF, # rebase + (DeltaChangeType.DELETE, None): PullActionType.DELETE, + (DeltaChangeType.DELETE, DeltaChangeType.UPDATE): None, + (DeltaChangeType.DELETE, DeltaChangeType.DELETE): None, + (DeltaChangeType.DELETE, DeltaChangeType.UPDATE_DIFF): None, + } + + return pull_action_map.get((server_change, local_change)) + + def get_local_delta(self, diff_directory: str) -> List[ProjectDeltaItem]: + """ + Calculate local delta needed to be pushed to server. + + Calculate diffs between local files metadata and actual files in project directory. Because simple metadata like + file size or checksum are not enough to determine geodiff files changes, geodiff tool is used to determine change + of file content and update corresponding metadata. + + .. seealso:: self.compare_file_sets + + :param diff_directory: directory where temporary diff files can be stored + :returns: delta items needed for files needed to be applied on a server + :rtype: List[ProjectDeltaItem] + """ + result = [] + changes = self.compare_file_sets(self.files(), self.inspect_files()) + added = changes.get("added", []) + removed = changes.get("removed", []) + updated = changes.get("updated", []) + + for change in added: + result.append( + ProjectDeltaItem( + change=DeltaChangeType.CREATE, + path=change.get("path"), + size=change.get("size"), + checksum=change.get("checksum"), + version="", + ) + ) + for change in removed: + result.append( + ProjectDeltaItem( + change=DeltaChangeType.DELETE, + path=change.get("path"), + size=change.get("size"), + checksum=change.get("checksum"), + version="", + ) + ) + + for change in updated: + path = change.get("path") + if not self.is_versioned_file(path): + result.append( + ProjectDeltaItem( + change=DeltaChangeType.UPDATE, + path=change.get("path"), + size=change.get("size"), + checksum=change.get("checksum"), + version="", + ) + ) + continue + # we use geodiff to check if we geopackage is diffable + current_file = self.fpath(path) + origin_file = self.fpath_meta(path) + diff_id = str(uuid.uuid4()) + diff_name = path + "-diff-" + diff_id + diff_file = os.path.join(diff_name) + delta_item = ProjectDeltaItem( + change=DeltaChangeType.UPDATE, + path=path, + size=change["size"], + checksum=change["checksum"], + version="", + ) + checkpoint_size, checkpoint_checksum = do_sqlite_checkpoint(path, self.log) + if checkpoint_size and checkpoint_checksum: + delta_item.size = checkpoint_size + delta_item.checksum = checkpoint_checksum + + try: + diff_location = self.fpath(diff_file, diff_directory) + self.geodiff.create_changeset(origin_file, current_file, diff_location) + if self.geodiff.has_changes(diff_location): + delta_item.checksum = change.get("origin_checksum") + delta_item.change = DeltaChangeType.UPDATE_DIFF + os.remove(diff_location) + except (pygeodiff.GeoDiffLibError, pygeodiff.GeoDiffLibConflictError) as e: + self.log.warning("failed to create changeset for " + path) + # probably the database schema has been modified if geodiff cannot create changeset. + # we will need to do full upload of the file + pass + result.append(delta_item) + + return result + def get_push_changes(self): """ Calculate changes needed to be pushed to server. @@ -499,21 +731,17 @@ def get_list_of_push_changes(self, push_changes): pass return changes - def apply_pull_changes(self, changes, temp_dir, server_project, mc): + def apply_pull_actions(self, actions: List[PullAction], download_dir: str, server_project: dict, mc) -> List[str]: """ - Apply changes pulled from server. + Apply pull actions for files. - Update project files according to file changes. Apply changes to geodiff basefiles as well + Update project files according to pull action. Apply changes to geodiff basefiles as well so they are up to date with server. In case of conflicts create backups from locally modified versions. - .. seealso:: self.get_pull_changes - - :param changes: metadata for pulled files - :type changes: dict[str, list[dict]] - :param temp_dir: directory with downloaded files from server - :type temp_dir: str - :param user_name: name of the user that is pulling the changes - :type user_name: str + :param actions: list of pull actions + :type actions: List[PullAction] + :param download_dir: directory with downloaded files from server + :type download_dir: str :param server_project: project metadata from the server :type server_project: dict :param mc: mergin client @@ -522,55 +750,53 @@ def apply_pull_changes(self, changes, temp_dir, server_project, mc): :rtype: list[str] """ conflicts = [] - local_changes = self.get_push_changes() - modified_local_paths = [f["path"] for f in local_changes.get("added", []) + local_changes.get("updated", [])] - - local_files_map = {} - for f in self.inspect_files(): - local_files_map.update({f["path"]: f}) - for k, v in changes.items(): - for item in v: - path = item["path"] - src = self.fpath(path, temp_dir) - dest = self.fpath(path) - basefile = self.fpath_meta(path) - - # special care is needed for geodiff files - # 'src' here is server version of file and 'dest' is locally modified - if self.is_versioned_file(path) and k == "updated": - if path in modified_local_paths: - conflict = self.update_with_rebase(path, src, dest, basefile, temp_dir, mc.username()) - if conflict: - conflicts.append(conflict) - else: - # The local file is not modified -> no rebase needed. - # We just apply the diff between our copy and server to both the local copy and its basefile - self.update_without_rebase(path, src, dest, basefile, temp_dir) + for action in actions: + path = action.pull_delta_item.path + server = self.fpath(path, download_dir) + live = self.fpath(path) + basefile = self.fpath_meta(path) + action_type = action.type + pull_change = action.pull_delta_item.change + local_change = action.local_delta_item.change if action.local_delta_item else None + if action_type == PullActionType.COPY: + # simply copy the file from server + if is_versioned_file(path): + self.geodiff.make_copy_sqlite(server, live) + self.geodiff.make_copy_sqlite(server, basefile) else: - # creating conflicted copy if both server and local changes are present on the files - if ( - path in modified_local_paths - and item["checksum"] != local_files_map[path]["checksum"] - and not prevent_conflicted_copy(path, mc, server_project) - ): - conflict = self.create_conflicted_copy(path, mc.username()) + shutil.copy(server, live) + + elif action_type == PullActionType.APPLY_DIFF: + # rebase needed only if both server and local changes are diffs + if pull_change == DeltaChangeType.UPDATE_DIFF and local_change == DeltaChangeType.UPDATE_DIFF: + conflict = self.update_with_rebase(path, server, live, basefile, download_dir, mc.username()) + if conflict: conflicts.append(conflict) + else: + # no rebase needed, just apply the diff + self.update_without_rebase(path, server, live, basefile, download_dir) + + elif action_type == PullActionType.COPY_CONFLICT and not prevent_conflicted_copy(path, mc, server_project): + conflict = self.create_conflicted_copy(path, mc.username()) + conflicts.append(conflict) + if self.is_versioned_file(path): + try: + self.geodiff.make_copy_sqlite(server, live) + self.geodiff.make_copy_sqlite(server, basefile) + except pygeodiff.GeoDiffLibError: + self.log.info("failed to create SQLite copy for file: " + path) + # create unfinished pull copy instead + f_server_unfinished = self.fpath_unfinished_pull(path) + self.geodiff.make_copy_sqlite(server, f_server_unfinished) + else: + shutil.copy(server, live) - if k == "removed": - if os.path.exists(dest): - os.remove(dest) - else: - # the file could be deleted via web interface AND also manually locally -> just log it - self.log.warning(f"File to be removed locally doesn't exist: {dest}") - if self.is_versioned_file(path): - os.remove(basefile) - else: - if self.is_versioned_file(path): - self.geodiff.make_copy_sqlite(src, dest) - self.geodiff.make_copy_sqlite(src, basefile) - else: - shutil.copy(src, dest) + elif action_type == PullActionType.DELETE: + # remove local file + os.remove(live) + if self.is_versioned_file(path): + os.remove(basefile) return conflicts @@ -719,7 +945,7 @@ def apply_push_changes(self, changes): else: pass - def create_conflicted_copy(self, file, user_name): + def create_conflicted_copy(self, file: str, user_name: str): """ Create conflicted copy file next to its origin. diff --git a/mergin/models.py b/mergin/models.py new file mode 100644 index 00000000..415aef31 --- /dev/null +++ b/mergin/models.py @@ -0,0 +1,95 @@ +from dataclasses import dataclass, field +from typing import List, Optional +from .common import DeltaChangeType, PullActionType + + +@dataclass +class ProjectDeltaItemDiff: + """ + Single diff file info. + """ + + id: str + size: int = 0 + version: Optional[str] = None + + +@dataclass +class ProjectDeltaItem: + """ + Single file change presented in project delta items. + """ + + change: DeltaChangeType + path: str + version: str + size: int + checksum: str + diffs: List[ProjectDeltaItemDiff] = field(default_factory=list) + + def __post_init__(self): + self.change = DeltaChangeType(self.change) + + +@dataclass +class ProjectDelta: + """ + Structure for project delta (changes between versions). + """ + + to_version: str + items: List[ProjectDeltaItem] = field(default_factory=list) + + +@dataclass +class PullAction: + """ + Action to be performed during pull. + """ + + type: PullActionType + pull_delta_item: ProjectDeltaItem + local_delta_item: Optional[ProjectDeltaItem] = None + + def __post_init__(self): + self.type = PullActionType(self.type) + + +@dataclass +class ProjectFile: + """ + File info in project response. + """ + + checksum: str + mtime: str + path: str + size: int + + +@dataclass +class ProjectWorkspace: + """ + Workspace info in project response. + """ + + id: int + name: str + + +@dataclass +class ProjectResponse: + """ + Project info response. + """ + + created_at: str + files: List[ProjectFile] + id: str + name: str + public: bool + role: str + size: int + updated_at: str + version: str + workspace: ProjectWorkspace diff --git a/mergin/test/test_client.py b/mergin/test/test_client.py index 07cdeec6..c5893104 100644 --- a/mergin/test/test_client.py +++ b/mergin/test/test_client.py @@ -31,6 +31,9 @@ download_project_finalize, download_project_is_running, download_project_cancel, + pull_project_async, + pull_project_finalize, + pull_project_wait, ) from ..utils import ( generate_checksum, @@ -42,7 +45,7 @@ from ..merginproject import pygeodiff from ..report import create_report from ..editor import EDITOR_ROLE_NAME, filter_changes, is_editor_enabled -from ..common import ErrorCode, WorkspaceRole, ProjectRole +from ..common import DeltaChangeType, ErrorCode, WorkspaceRole, ProjectRole SERVER_URL = os.environ.get("TEST_MERGIN_URL") API_USER = os.environ.get("TEST_API_USERNAME") @@ -261,19 +264,26 @@ def test_create_remote_project_from_local(mc): # check basic metadata about created project project_info = mc.project_info(project) - assert project_info["version"] == "v1" + assert project_info["version"] == source_mp.version() assert project_info["name"] == test_project assert project_info["namespace"] == API_USER assert project_info["id"] == source_mp.project_id() # check project metadata retrieval by id project_info = mc.project_info(source_mp.project_id()) - assert project_info["version"] == "v1" + assert project_info["version"] == source_mp.version() assert project_info["name"] == test_project assert project_info["namespace"] == API_USER assert project_info["id"] == source_mp.project_id() - version = mc.project_version_info(project_info.get("id"), "v1") + # check basic metadata about created project + project_info = mc.project_info_v2(project_info.get("id")) + assert project_info.version == source_mp.version() + assert project_info.name == test_project + assert project_info.workspace.name == API_USER + assert project_info.id == source_mp.project_id() + + version = mc.project_version_info(project_info.id, "v1") assert version["name"] == "v1" assert any(f for f in version["changes"]["added"] if f["path"] == "test.qgs") @@ -357,6 +367,20 @@ def test_push_pull_changes(mc): updated_file = [f for f in project_version["changes"]["updated"] if f["path"] == f_updated][0] assert "origin_checksum" not in updated_file # internal client info + project_info = mc.project_info(mp.project_id(), mp.version()) + assert project_info["version"] == "v2" + assert not next((f for f in project_info["files"] if f["path"] == f_removed), None) + assert not next((f for f in project_info["files"] if f["path"] == f_renamed), None) + assert next((f for f in project_info["files"] if f["path"] == "renamed.txt"), None) + assert next((f for f in project_info["files"] if f["path"] == f_added), None) + f_remote_checksum = next((f["checksum"] for f in project_info["files"] if f["path"] == f_updated), None) + assert generate_checksum(os.path.join(project_dir, f_updated)) == f_remote_checksum + assert project_info["id"] == mp.project_id() + assert len(project_info["files"]) == len(mp.inspect_files()) + project_version = mc.project_version_info(project_info["id"], "v2") + updated_file = [f for f in project_version["changes"]["updated"] if f["path"] == f_updated][0] + assert "origin_checksum" not in updated_file # internal client info + # test parallel changes with open(os.path.join(project_dir_2, f_updated), "w") as f: f.write("Make some conflict") @@ -485,6 +509,7 @@ def test_sync_diff(mc): f_renamed = "test_dir/modified_1_geom.gpkg" shutil.move(mp.fpath(f_renamed), mp.fpath("renamed.gpkg")) mc.push_project(project_dir) + mp = MerginProject(project_dir) # check project after push project_info = mc.project_info(project) @@ -497,6 +522,17 @@ def test_sync_diff(mc): assert "diff" in f_remote assert os.path.exists(mp.fpath_meta("renamed.gpkg")) + # check project after push + + project_info = mc.project_info_v2(mp.project_id(), mp.version()) + assert project_info.version == "v3" + assert project_info.id == mp.project_id() + f_remote = next((f for f in project_info.files if f.path == f_updated), None) + assert next((f for f in project_info.files if f.path == "renamed.gpkg"), None) + assert not next((f for f in project_info.files if f.path == f_removed), None) + assert not os.path.exists(mp.fpath_meta(f_removed)) + assert os.path.exists(mp.fpath_meta("renamed.gpkg")) + # pull project in different directory mp2 = MerginProject(project_dir_2) mc.pull_project(project_dir_2) @@ -654,7 +690,9 @@ def test_missing_basefile_pull(mc): # try to sync again -- it should not crash mc.pull_project(project_dir) + assert os.path.exists(os.path.join(project_dir, ".mergin", "base.gpkg")) mc.push_project(project_dir) + # check if base file exists again def test_empty_file_in_subdir(mc): @@ -2397,51 +2435,6 @@ def test_reset_local_changes(mc: MerginClient): assert len(push_changes["updated"]) == 0 -def test_project_metadata(mc): - test_project = "test_project_metadata" - project = API_USER + "/" + test_project - project_dir = os.path.join(TMP_DIR, test_project) - - cleanup(mc, project, [project_dir]) - - # prepare local project - shutil.copytree(TEST_DATA_DIR, project_dir) - - # copy metadata in old format - os.makedirs(os.path.join(project_dir, ".mergin"), exist_ok=True) - metadata_file = os.path.join(project_dir, "old_metadata.json") - # rewrite metadata nemespace to prevent failing tests with other user than test_plugin - with open(metadata_file, "r") as f: - metadata = json.load(f) - metadata["name"] = f"{API_USER}/{test_project}" - project_metadata_file = os.path.join(project_dir, ".mergin", "mergin.json") - with open(project_metadata_file, "w") as f: - json.dump(metadata, f, indent=2) - - # verify we have correct metadata - mp = MerginProject(project_dir) - assert mp.project_full_name() == f"{API_USER}/{test_project}" - assert mp.project_name() == test_project - assert mp.workspace_name() == API_USER - assert mp.version() == "v0" - - # copy metadata in new format - metadata_file = os.path.join(project_dir, "new_metadata.json") - # rewrite metadata nemespace to prevent failing tests with other user than test_plugin - with open(metadata_file, "r") as f: - metadata = json.load(f) - metadata["namespace"] = API_USER - with open(project_metadata_file, "w") as f: - json.dump(metadata, f, indent=2) - - # verify we have correct metadata - mp = MerginProject(project_dir) - assert mp.project_full_name() == f"{API_USER}/{test_project}" - assert mp.project_name() == test_project - assert mp.workspace_name() == API_USER - assert mp.version() == "v0" - - def test_project_rename(mc: MerginClient): """Check project can be renamed""" @@ -3026,3 +3019,58 @@ def test_server_type(mc): mock_client_get.side_effect = ClientError(detail="Service unavailable", http_error=503) with pytest.raises(ClientError, match="Service unavailable"): mc.server_type() + + +def test_pull_project(mc: MerginClient, mc2: MerginClient): + """Test pull_project method""" + test_project = "test_pull_project_created" + test_project_to_pull = "test_pull_project_created_2" + project = API_USER + "/" + test_project + project_dir = os.path.join(TMP_DIR, test_project) + project_dir_to_pull = os.path.join(TMP_DIR, test_project_to_pull) + project_dir_to_pull_v1 = os.path.join(TMP_DIR, test_project_to_pull + "_v1") + + cleanup(mc, project, [project_dir, project_dir_to_pull, project_dir_to_pull_v1]) + + mp = create_versioned_project(mc, test_project, project_dir, "base.gpkg", remove=False) + # download project to another dir + mc.download_project(project, project_dir_to_pull, version="v0") + mp_to_pull = MerginProject(project_dir_to_pull) + + if mc.server_features() and mc.server_features().get("v2_pull_enabled"): + delta = mc.get_project_delta(mp_to_pull.project_id(), since=mp_to_pull.version()) + assert delta.items + assert delta.to_version == mp.version() + job = pull_project_async(mc, project_dir_to_pull) + assert len(job.download_queue_items) == len(delta.items) - 1 # excluding .qgs zero size file + assert os.path.exists(job.tmp_dir.name) + pull_project_wait(job) + # check project info after pull + project_info = mc.project_info_v2(mp_to_pull.project_id(), files_at_version=mp.version()) + pull_project_finalize(job) + mp_to_pull = MerginProject(project_dir_to_pull) + assert not os.path.exists(job.tmp_dir.name) + assert mp_to_pull.version() == mp.version() + assert mp_to_pull.project_id() == mp.project_id() + assert len(project_info.files) == len(mp.files()) + for item in delta.items: + assert os.path.exists(mp_to_pull.fpath(item.path)) + assert os.path.exists(mp_to_pull.fpath_meta("base.gpkg")) + + mc._server_features = {"v2_pull_enabled": False} # force disable v2 pull + mc.download_project(project, project_dir_to_pull_v1, version="v0") + job = pull_project_async(mc, project_dir_to_pull_v1) + assert os.path.exists(job.tmp_dir.name) + pull_project_wait(job) + # check project info after pull + project_info = mc.project_info(mp_to_pull.project_full_name()) + pull_project_finalize(job) + mp_to_pull = MerginProject(project_dir_to_pull_v1) + assert not os.path.exists(job.tmp_dir.name) + assert mp_to_pull.version() == mp.version() + assert mp_to_pull.project_id() == mp.project_id() + assert len(project_info.get("files")) == len(mp.files()) + delta = mp.get_pull_delta({"files": mp_to_pull.files(), "version": mp_to_pull.version()}) + for item in [item for item in delta.items if item.change == DeltaChangeType.CREATE]: + assert os.path.exists(mp_to_pull.fpath(item.path)) + assert os.path.exists(mp_to_pull.fpath_meta("base.gpkg")) diff --git a/mergin/test/test_client_pull.py b/mergin/test/test_client_pull.py new file mode 100644 index 00000000..af0d6cf4 --- /dev/null +++ b/mergin/test/test_client_pull.py @@ -0,0 +1,90 @@ +import os +import tempfile +import pytest +from mergin.common import DeltaChangeType, CHUNK_SIZE +from mergin.models import ProjectDeltaItem, ProjectDeltaItemDiff +from mergin.client_pull import prepare_file_destination, get_diff_merge_files, get_download_items + + +def test_prepare_file_destination(): + with tempfile.TemporaryDirectory() as tmp_dir: + path = "subdir/file.txt" + dest = prepare_file_destination(tmp_dir, path) + + expected_dir = os.path.join(tmp_dir, "subdir") + expected_path = os.path.join(expected_dir, "file.txt") + + assert dest == expected_path + assert os.path.exists(expected_dir) + assert os.path.isdir(expected_dir) + assert not os.path.exists(expected_path) # file should not exist yet + + +def test_get_diff_merge_files(): + with tempfile.TemporaryDirectory() as tmp_dir: + item = ProjectDeltaItem( + change=DeltaChangeType.UPDATE_DIFF, + path="data.gpkg", + version="v3", + size=300, + checksum="789", + diffs=[ + ProjectDeltaItemDiff(id="diff2", size=20, version="v2"), + ], + ) + + merge_files = get_diff_merge_files(item, tmp_dir) + + assert len(merge_files) == 1 + + # Check diff + f2 = merge_files[0] + assert f2.dest_file == os.path.join(tmp_dir, "diff2") + assert len(f2.downloaded_items) == 1 + assert f2.downloaded_items[0].file_path == "data.gpkg" + assert f2.downloaded_items[0].size == 20 + assert f2.downloaded_items[0].version == "v2" + + +@pytest.fixture +def test_get_download_items(): + with tempfile.TemporaryDirectory() as tmp_dir: + # Case 1: Small file (one chunk) + items = get_download_items("small.txt", 100, "v1", tmp_dir) + assert len(items) == 1 + assert items[0].file_path == "small.txt" + assert items[0].size == 100 + assert items[0].part_index == 0 + assert items[0].download_file_path == os.path.join(tmp_dir, "small.txt.0") + + # Case 2: Large file (multiple chunks) + file_size = int(CHUNK_SIZE * 2.5) + items = get_download_items("large.txt", file_size, "v1", tmp_dir) + assert len(items) == 3 + + # Chunk 0 + assert items[0].size == CHUNK_SIZE + assert items[0].part_index == 0 + assert items[0].download_file_path == os.path.join(tmp_dir, "large.txt.0") + + # Chunk 1 + assert items[1].size == CHUNK_SIZE + assert items[1].part_index == 1 + assert items[1].download_file_path == os.path.join(tmp_dir, "large.txt.1") + + # Chunk 2 + assert items[2].size == int(CHUNK_SIZE * 0.5) + assert items[2].part_index == 2 + assert items[2].download_file_path == os.path.join(tmp_dir, "large.txt.2") + + # Case 3: Empty file + items = get_download_items("empty.txt", 0, "v1", tmp_dir) + assert len(items) == 0 + + # Case 4: Diff only + items = get_download_items("base.gpkg", 50, "v1", tmp_dir, download_path="diff_file", diff_only=True) + assert len(items) == 1 + assert items[0].diff_only is True + assert items[0].file_path == "base.gpkg" + assert items[0].size == 50 + assert items[0].download_file_path == os.path.join(tmp_dir, "diff_file.0") diff --git a/mergin/test/test_data/v2_metadata.json b/mergin/test/test_data/v2_metadata.json new file mode 100644 index 00000000..f285437e --- /dev/null +++ b/mergin/test/test_data/v2_metadata.json @@ -0,0 +1,34 @@ +{ + "created_at": "2025-12-04T15:51:55Z", + "files": [ + { + "checksum": "22afdbbf504087ea0ff7f1a2aeca2e265cc01bb5", + "mtime": "2025-12-04T15:52:09Z", + "path": "qgs_project.qgs", + "size": 72024 + }, + { + "checksum": "7f46f5d1a160a71aff5450f55853b0d552958f58", + "mtime": "2025-12-04T15:52:09Z", + "path": "photo.jpg", + "size": 5646973 + }, + { + "checksum": "3a8c54469e4fe498faffe66f4671fb9b0e6c0221", + "mtime": "2025-12-04T15:54:21Z", + "path": "data.gpkg", + "size": 126976 + } + ], + "id": "e5cd45e5-8250-422a-ac16-e8c771689fd8", + "name": "diff_8dfb", + "public": false, + "role": "owner", + "size": 5845973, + "updated_at": "2025-12-04T15:54:21Z", + "version": "v101", + "workspace": { + "id": 1, + "name": "my-workspace" + } +} diff --git a/mergin/test/test_mergin_project.py b/mergin/test/test_mergin_project.py new file mode 100644 index 00000000..ef371897 --- /dev/null +++ b/mergin/test/test_mergin_project.py @@ -0,0 +1,455 @@ +import os +import shutil +import json +import tempfile +import pytest +from mergin.merginproject import MerginProject +from mergin.common import DeltaChangeType, PullActionType, ClientError +from mergin.models import ProjectDeltaItem, ProjectDeltaItemDiff +from mergin.client_pull import PullAction +from mergin.utils import edit_conflict_file_name + +TEST_DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "test_data") + + +def test_project_metadata(): + + with tempfile.TemporaryDirectory() as tmp_dir: + test_project = "test_project_metadata" + api_user = "test_user" + project_dir = os.path.join(tmp_dir, test_project) + + # prepare local project + shutil.copytree(TEST_DATA_DIR, project_dir) + # copy metadata in old format + os.makedirs(os.path.join(project_dir, ".mergin"), exist_ok=True) + metadata_file = os.path.join(project_dir, "old_metadata.json") + + # rewrite metadata namespace + with open(metadata_file, "r") as f: + metadata = json.load(f) + + project_metadata_file = os.path.join(project_dir, ".mergin", "mergin.json") + with open(project_metadata_file, "w") as f: + json.dump(metadata, f, indent=2) + + # verify we have correct metadata + mp = MerginProject(project_dir) + assert mp.project_full_name() == metadata.get("name") + project_name = metadata.get("name").split("/")[1] + workspace_name = metadata.get("name").split("/")[0] + assert mp.project_name() == project_name + assert mp.workspace_name() == workspace_name + assert mp.version() == metadata.get("version") + + # copy metadata in new format + metadata_file = os.path.join(project_dir, "new_metadata.json") + with open(metadata_file, "r") as f: + metadata = json.load(f) + metadata["namespace"] = api_user + with open(project_metadata_file, "w") as f: + json.dump(metadata, f, indent=2) + + # verify we have correct metadata + mp = MerginProject(project_dir) + project_name = metadata.get("name") + workspace_name = metadata.get("namespace") + assert mp.project_full_name() == f"{workspace_name}/{project_name}" + assert mp.project_name() == project_name + assert mp.workspace_name() == workspace_name + assert mp.version() == metadata.get("version") + + # copy metadata in new format (v2) + metadata_file = os.path.join(project_dir, "v2_metadata.json") + with open(metadata_file, "r") as f: + metadata = json.load(f) + with open(project_metadata_file, "w") as f: + json.dump(metadata, f, indent=2) + + # verify we have correct metadata + mp = MerginProject(project_dir) + project_name = metadata.get("name") + workspace_name = metadata.get("workspace").get("name") + assert mp.project_full_name() == f"{workspace_name}/{project_name}" + assert mp.project_name() == project_name + assert mp.workspace_name() == workspace_name + assert mp.version() == metadata.get("version") + assert mp.files() == metadata.get("files") + + +def test_get_pull_action_valid(): + """Test get_pull_action with valid combinations.""" + with tempfile.TemporaryDirectory() as tmp_dir: + mp = MerginProject(tmp_dir) + + # Test cases: (server_change, local_change, expected_action) + test_cases = [ + (DeltaChangeType.CREATE, None, PullActionType.COPY), + (DeltaChangeType.CREATE, DeltaChangeType.CREATE, PullActionType.COPY_CONFLICT), + (DeltaChangeType.UPDATE, None, PullActionType.COPY), + (DeltaChangeType.UPDATE, DeltaChangeType.UPDATE, PullActionType.COPY_CONFLICT), + (DeltaChangeType.UPDATE, DeltaChangeType.DELETE, PullActionType.COPY), + (DeltaChangeType.UPDATE, DeltaChangeType.UPDATE_DIFF, PullActionType.COPY_CONFLICT), + (DeltaChangeType.UPDATE_DIFF, None, PullActionType.APPLY_DIFF), + (DeltaChangeType.UPDATE_DIFF, DeltaChangeType.UPDATE, PullActionType.COPY_CONFLICT), + (DeltaChangeType.UPDATE_DIFF, DeltaChangeType.DELETE, PullActionType.COPY), + (DeltaChangeType.UPDATE_DIFF, DeltaChangeType.UPDATE_DIFF, PullActionType.APPLY_DIFF), + (DeltaChangeType.DELETE, None, PullActionType.DELETE), + (DeltaChangeType.DELETE, DeltaChangeType.UPDATE, None), + (DeltaChangeType.DELETE, DeltaChangeType.DELETE, None), + (DeltaChangeType.DELETE, DeltaChangeType.UPDATE_DIFF, None), + ] + + for server_change, local_change, expected_action in test_cases: + action = mp.get_pull_action(server_change, local_change) + assert ( + action == expected_action + ), f"Failed for {server_change}, {local_change}. Expected {expected_action}, got {action}" + + +def test_get_pull_action_fatal(): + """Test get_pull_action with fatal combinations.""" + with tempfile.TemporaryDirectory() as tmp_dir: + mp = MerginProject(tmp_dir) + + fatal_cases = [ + (DeltaChangeType.CREATE, DeltaChangeType.UPDATE), + (DeltaChangeType.CREATE, DeltaChangeType.DELETE), + (DeltaChangeType.CREATE, DeltaChangeType.UPDATE_DIFF), + (DeltaChangeType.UPDATE, DeltaChangeType.CREATE), + (DeltaChangeType.UPDATE_DIFF, DeltaChangeType.CREATE), + (DeltaChangeType.DELETE, DeltaChangeType.CREATE), + ] + + for server_change, local_change in fatal_cases: + with pytest.raises(ClientError, match="Invalid combination of changes"): + mp.get_pull_action(server_change, local_change) + + +def test_get_pull_delta(): + """Test get_pull_delta with mocked compare_file_sets.""" + mp = MerginProject.__new__(MerginProject) + + # Mock compare_file_sets return value + mock_changes = { + "added": [{"path": "new.txt", "size": 10, "checksum": "c1"}], + "removed": [{"path": "deleted.txt", "size": 20, "checksum": "c2"}], + "updated": [ + {"path": "updated.txt", "size": 30, "checksum": "c3", "history": {}}, + { + "path": "data.gpkg", + "size": 40, + "checksum": "c4", + "history": { + "v2": {"diff": {"path": "diff_v2", "size": 5}}, + "v3": {"diff": {"path": "diff_v3", "size": 6}}, + }, + }, + ], + } + mp.compare_file_sets = lambda local, server: mock_changes + mp.files = lambda: [] + mp.version = lambda: "v1" + mp.is_versioned_file = lambda path: path.endswith(".gpkg") + + server_info = {"files": [], "version": "v3"} + delta = mp.get_pull_delta(server_info) + + assert len(delta.items) == 4 + + # Verify items + create_item = next(i for i in delta.items if i.path == "new.txt") + assert create_item.change == DeltaChangeType.CREATE + + delete_item = next(i for i in delta.items if i.path == "deleted.txt") + assert delete_item.change == DeltaChangeType.DELETE + + update_item = next(i for i in delta.items if i.path == "updated.txt") + assert update_item.change == DeltaChangeType.UPDATE + + diff_item = next(i for i in delta.items if i.path == "data.gpkg") + assert diff_item.change == DeltaChangeType.UPDATE_DIFF + assert len(diff_item.diffs) == 2 + + +def test_get_local_delta(): + """Test get_local_delta with mocked compare_file_sets.""" + with tempfile.TemporaryDirectory() as tmp_dir: + test_project = "delta_test_project" + project_dir = os.path.join(tmp_dir, test_project) + os.makedirs(project_dir, exist_ok=True) + shutil.copyfile(os.path.join(TEST_DATA_DIR, "inserted_1_A.gpkg"), os.path.join(project_dir, "base.gpkg")) + mp = MerginProject(project_dir) + shutil.copyfile(os.path.join(TEST_DATA_DIR, "base.gpkg"), os.path.join(project_dir, ".mergin", "base.gpkg")) + + # Mock compare_file_sets return value + mock_changes = { + "added": [{"path": "new.txt", "size": 10, "checksum": "c1"}], + "removed": [{"path": "deleted.txt", "size": 20, "checksum": "c2"}], + "updated": [ + {"path": "updated.txt", "size": 30, "checksum": "c3"}, + {"path": "base.gpkg", "size": 40, "checksum": "c4"}, + ], + } + mp.compare_file_sets = lambda local, server: mock_changes + + # Mock files() to return origin info for version lookup + mp.files = lambda: [] + + mp.inspect_files = lambda: [] # Dummy return + + delta_items = mp.get_local_delta(project_dir) + assert len(delta_items) == 4 + + # Verify items + create_item = next(i for i in delta_items if i.path == "new.txt") + assert create_item.change == DeltaChangeType.CREATE + + delete_item = next(i for i in delta_items if i.path == "deleted.txt") + assert delete_item.change == DeltaChangeType.DELETE + + update_item = next(i for i in delta_items if i.path == "updated.txt") + assert update_item.change == DeltaChangeType.UPDATE + + update_diff_item = next(i for i in delta_items if i.path == "base.gpkg") + assert update_diff_item.change == DeltaChangeType.UPDATE_DIFF + + +def test_apply_pull_actions_apply_diff(): + """Test apply_pull_actions with APPLY_DIFF action, with and without rebase.""" + with tempfile.TemporaryDirectory() as tmp_dir, tempfile.TemporaryDirectory() as tmp_dir2: + test_project = "apply_diff_no_rebase" + project_dir = os.path.join(tmp_dir, test_project) + os.makedirs(project_dir, exist_ok=True) + live = os.path.join(project_dir, "base.gpkg") + shutil.copyfile(os.path.join(TEST_DATA_DIR, "base.gpkg"), live) + mp = MerginProject(project_dir) + base = os.path.join(project_dir, ".mergin", "base.gpkg") + shutil.copyfile(os.path.join(TEST_DATA_DIR, "base.gpkg"), base) + + # Check APPLY_DIFF action without rebase + server_diff = os.path.join(tmp_dir, "server_diff_mock.diff") + server_gpkg = os.path.join(tmp_dir, "base.gpkg") + # mimic server changes + mp.geodiff.create_changeset(live, os.path.join(TEST_DATA_DIR, "inserted_1_A.gpkg"), server_diff) + mp.geodiff.make_copy_sqlite(os.path.join(TEST_DATA_DIR, "inserted_1_A.gpkg"), server_gpkg) + pull_action = PullAction( + type=PullActionType.APPLY_DIFF, + pull_delta_item=ProjectDeltaItem( + change=DeltaChangeType.UPDATE_DIFF, + path="base.gpkg", + version="v1", + size=40, + checksum="c4", + diffs=[ProjectDeltaItemDiff(id="server_diff_mock.diff")], + ), + ) + mp.apply_pull_actions([pull_action], tmp_dir, {}, None) + # verify that live and base has been updated + mp.geodiff.create_changeset(live, server_gpkg, os.path.join(tmp_dir, "live-server.diff")) + mp.geodiff.create_changeset(live, base, os.path.join(tmp_dir, "live-base.diff")) + assert not mp.geodiff.has_changes(os.path.join(tmp_dir, "live-server.diff")) + assert not mp.geodiff.has_changes(os.path.join(tmp_dir, "live-base.diff")) + # With rebase + test_project = "apply_diff_rebase" + project_dir = os.path.join(tmp_dir2, test_project) + os.makedirs(project_dir, exist_ok=True) + live = os.path.join(project_dir, "base.gpkg") + mp = MerginProject(project_dir) + mp.geodiff.make_copy_sqlite(os.path.join(TEST_DATA_DIR, "base.gpkg"), live) # local change + base = os.path.join(project_dir, ".mergin", "base.gpkg") + mp.geodiff.make_copy_sqlite(os.path.join(TEST_DATA_DIR, "base.gpkg"), base) + shutil.copyfile( + os.path.join(TEST_DATA_DIR, "v2_metadata.json"), os.path.join(project_dir, ".mergin", "mergin.json") + ) + + # Check APPLY_DIFF action with rebase + server_gpkg = os.path.join(tmp_dir2, "base.gpkg") + assert os.path.exists(live) + # mimic server changes + local changes + mp.geodiff.make_copy_sqlite(os.path.join(TEST_DATA_DIR, "inserted_1_A.gpkg"), server_gpkg) + mp.geodiff.make_copy_sqlite(os.path.join(TEST_DATA_DIR, "inserted_1_B.gpkg"), live) # local change + pull_action = PullAction( + type=PullActionType.APPLY_DIFF, + pull_delta_item=ProjectDeltaItem( + change=DeltaChangeType.UPDATE_DIFF, + path="base.gpkg", + version="v1", + size=0, + checksum="", + diffs=[ProjectDeltaItemDiff(id="server_diff_mock.diff")], + ), + local_delta_item=ProjectDeltaItem( + change=DeltaChangeType.UPDATE_DIFF, path="base.gpkg", version="v1", size=40, checksum="c4" + ), + ) + + class MockMC: + def has_editor_support(self): + return True + + def username(self): + return "test_user" + + mc = MockMC() + mp.apply_pull_actions([pull_action], tmp_dir2, {}, mc) + # here are still changes B' (fid with 4 moved to 5 after rebase) + mp.geodiff.create_changeset(live, server_gpkg, os.path.join(tmp_dir2, "live-server.diff")) + mp.geodiff.create_changeset(live, base, os.path.join(tmp_dir2, "live-base.diff")) + assert mp.geodiff.has_changes(os.path.join(tmp_dir2, "live-server.diff")) + assert mp.geodiff.has_changes(os.path.join(tmp_dir2, "live-base.diff")) + assert not os.path.exists(edit_conflict_file_name("base.gpkg", mc.username(), "v1")) + + +def test_apply_pull_actions_copy(): + """Test apply_pull_actions with COPY action.""" + with tempfile.TemporaryDirectory() as tmp_dir: + test_project = "apply_copy" + project_dir = os.path.join(tmp_dir, test_project) + os.makedirs(project_dir, exist_ok=True) + mp = MerginProject(project_dir) + # mimic downloaded file to temp dir + shutil.copyfile(os.path.join(TEST_DATA_DIR, "test.txt"), os.path.join(tmp_dir, "test.txt")) + shutil.copyfile(os.path.join(TEST_DATA_DIR, "base.gpkg"), os.path.join(tmp_dir, "base.gpkg")) + + # let's prepare pull actions + pull_actions = [ + PullAction( + type=PullActionType.COPY, + pull_delta_item=ProjectDeltaItem( + change=DeltaChangeType.CREATE, + path="test.txt", + version="v1", + size=10, + checksum="c1", + ), + ), + PullAction( + type=PullActionType.COPY, + pull_delta_item=ProjectDeltaItem( + change=DeltaChangeType.CREATE, + path="base.gpkg", + version="v1", + size=20, + checksum="c2", + ), + ), + ] + mp.apply_pull_actions(pull_actions, tmp_dir, {}, None) + assert os.path.exists(os.path.join(project_dir, "test.txt")) + assert os.path.exists(os.path.join(project_dir, "base.gpkg")) + assert os.path.exists(mp.fpath_meta("base.gpkg")) + + +def test_apply_pull_actions_delete(): + """Test apply_pull_actions with DELETE action.""" + with tempfile.TemporaryDirectory() as tmp_dir: + test_project = "apply_delete" + project_dir = os.path.join(tmp_dir, test_project) + os.makedirs(project_dir, exist_ok=True) + mp = MerginProject(project_dir) + # mimic downloaded file to temp dir + shutil.copyfile(os.path.join(TEST_DATA_DIR, "test.txt"), os.path.join(project_dir, "test.txt")) + shutil.copyfile(os.path.join(TEST_DATA_DIR, "base.gpkg"), os.path.join(project_dir, "base.gpkg")) + shutil.copyfile(os.path.join(TEST_DATA_DIR, "base.gpkg"), mp.fpath_meta("base.gpkg")) + + # prepare pull actions + pull_actions = [ + PullAction( + type=PullActionType.DELETE, + pull_delta_item=ProjectDeltaItem( + change=DeltaChangeType.DELETE, + path="test.txt", + version="v1", + size=10, + checksum="c1", + ), + ), + PullAction( + type=PullActionType.DELETE, + pull_delta_item=ProjectDeltaItem( + change=DeltaChangeType.DELETE, + path="base.gpkg", + version="v1", + size=20, + checksum="c2", + ), + ), + ] + mp.apply_pull_actions(pull_actions, tmp_dir, {}, None) + assert not os.path.exists(os.path.join(project_dir, "test.txt")) + assert not os.path.exists(os.path.join(project_dir, "base.gpkg")) + assert not os.path.exists(mp.fpath_meta("base.gpkg")) + + +def test_apply_pull_actions_copy_conflict(): + """Test apply_pull_actions with COPY_CONFLICT action.""" + with tempfile.TemporaryDirectory() as tmp_dir: + test_project = "apply_copy_conflict" + project_dir = os.path.join(tmp_dir, test_project) + os.makedirs(project_dir, exist_ok=True) + mp = MerginProject(project_dir) + # mimic downloaded file to temp dir + live = os.path.join(project_dir, "base.gpkg") + server_gpkg = os.path.join(tmp_dir, "base.gpkg") + base = mp.fpath_meta("base.gpkg") + shutil.copyfile(os.path.join(TEST_DATA_DIR, "test.txt"), os.path.join(tmp_dir, "test.txt")) + shutil.copyfile(os.path.join(TEST_DATA_DIR, "test.txt"), os.path.join(project_dir, "test.txt")) + shutil.copyfile(os.path.join(TEST_DATA_DIR, "inserted_1_A.gpkg"), server_gpkg) + shutil.copyfile(os.path.join(TEST_DATA_DIR, "base.gpkg"), live) + shutil.copyfile(os.path.join(TEST_DATA_DIR, "base.gpkg"), base) + shutil.copyfile(os.path.join(TEST_DATA_DIR, "v2_metadata.json"), mp.fpath_meta("mergin.json")) + with open(os.path.join(project_dir, "test.txt"), "w") as f: + f.write("local change") + + # prepare pull actions + pull_actions = [ + PullAction( + type=PullActionType.COPY_CONFLICT, + pull_delta_item=ProjectDeltaItem( + change=DeltaChangeType.UPDATE, + path="test.txt", + version="v1", + size=10, + checksum="c1", + ), + ), + PullAction( + type=PullActionType.COPY_CONFLICT, + pull_delta_item=ProjectDeltaItem( + change=DeltaChangeType.UPDATE, + path="base.gpkg", + version="v1", + size=20, + checksum="c2", + ), + ), + ] + + class MockMC: + def has_editor_support(self): + return True + + def username(self): + return "test_user" + + mc = MockMC() + conflicts = mp.apply_pull_actions(pull_actions, tmp_dir, {"role": "writer"}, mc) + assert len(conflicts) == 2 + assert os.path.exists(conflicts[0]) + assert os.path.exists(os.path.join(project_dir, "test.txt")) + with open(os.path.join(tmp_dir, "test.txt"), "r") as f, open(os.path.join(project_dir, "test.txt"), "r") as f2: + downloaded_content = f.read() + local_content = f2.read() + assert downloaded_content == local_content + + with open(conflicts[0], "r") as f: + conflict_content = f.read() + assert conflict_content == "local change" + + mp.geodiff.create_changeset(live, server_gpkg, os.path.join(tmp_dir, "live-server.diff")) + mp.geodiff.create_changeset(live, base, os.path.join(tmp_dir, "live-base.diff")) + mp.geodiff.create_changeset(live, conflicts[1], os.path.join(tmp_dir, "live-conflict.diff")) + assert not mp.geodiff.has_changes(os.path.join(tmp_dir, "live-server.diff")) + assert not mp.geodiff.has_changes(os.path.join(tmp_dir, "live-base.diff")) + assert mp.geodiff.has_changes(os.path.join(tmp_dir, "live-conflict.diff"))