Skip to content

Commit fc90173

Browse files
committed
comments @varmar05
1 parent 9552372 commit fc90173

File tree

4 files changed

+124
-130
lines changed

4 files changed

+124
-130
lines changed

mergin/client.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -319,13 +319,6 @@ def delete(self, path, validate_auth=True):
319319
request = urllib.request.Request(url, method="DELETE")
320320
return self._do_request(request, validate_auth=validate_auth)
321321

322-
def head(self, path, data=None, headers={}, validate_auth=True):
323-
url = urllib.parse.urljoin(self.url, urllib.parse.quote(path))
324-
if data:
325-
url += "?" + urllib.parse.urlencode(data)
326-
request = urllib.request.Request(url, headers=headers, method="HEAD")
327-
return self._do_request(request, validate_auth=validate_auth)
328-
329322
def login(self, login, password):
330323
"""
331324
Authenticate login credentials and store session token
@@ -745,7 +738,7 @@ def project_info_v2(self, project_id: str, files_at_version=None) -> ProjectResp
745738
params = {"files_at_version": files_at_version}
746739
resp = self.get(f"/v2/projects/{project_id}", params)
747740
resp_json = json.load(resp)
748-
project_workspace = resp_json.get("workspace", {})
741+
project_workspace = resp_json.get("workspace")
749742
return ProjectResponse(
750743
id=resp_json.get("id"),
751744
name=resp_json.get("name"),

mergin/client_pull.py

Lines changed: 101 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,42 @@ def dump(self):
8080
print("--- END ---")
8181

8282

83+
class DownloadQueueItem:
84+
"""a piece of data from a project that should be downloaded - it can be either a chunk or it can be a diff"""
85+
86+
def __init__(self, file_path, size, version, diff_only, part_index, download_file_path):
87+
self.file_path = file_path # relative path to the file within project
88+
self.size = size # size of the item in bytes
89+
self.version = version # version of the file ("v123")
90+
self.diff_only = diff_only # whether downloading diff or full version
91+
self.part_index = part_index # index of the chunk
92+
self.download_file_path = download_file_path # full path to a temporary file which will receive the content
93+
94+
def __repr__(self):
95+
return "<DownloadQueueItem path={} version={} diff_only={} part_index={} size={} dest={}>".format(
96+
self.file_path, self.version, self.diff_only, self.part_index, self.size, self.download_file_path
97+
)
98+
99+
def download_blocking(self, mc, mp, project_path):
100+
"""Starts download and only returns once the file has been fully downloaded and saved"""
101+
102+
mp.log.debug(
103+
f"Downloading {self.file_path} version={self.version} diff={self.diff_only} part={self.part_index}"
104+
)
105+
start = self.part_index * (1 + CHUNK_SIZE)
106+
resp = mc.get(
107+
"/v1/project/raw/{}".format(project_path),
108+
data={"file": self.file_path, "version": self.version, "diff": self.diff_only},
109+
headers={"Range": "bytes={}-{}".format(start, start + CHUNK_SIZE)},
110+
)
111+
if resp.status in [200, 206]:
112+
mp.log.debug(f"Download finished: {self.file_path}")
113+
save_to_file(resp, self.download_file_path)
114+
else:
115+
mp.log.error(f"Download failed: {self.file_path}")
116+
raise ClientError("Failed to download part {} of file {}".format(self.part_index, self.file_path))
117+
118+
83119
class DownloadDiffQueueItem:
84120
"""Download item representing a diff file to be downloaded using v2 diff/raw endpoint"""
85121

@@ -107,20 +143,24 @@ def download_blocking(self, mc, mp):
107143
raise ClientError(f"Failed to download of diff file {self.file_path} to {self.download_file_path}")
108144

109145

110-
class FileToMerge:
146+
class DownloadFile:
111147
"""
112148
Keeps information about how to create a file (path specified by dest_file) from a couple
113149
of downloaded items (chunks) - each item is DownloadQueueItem object which has path
114-
to the temporary file containing its data. Calling merge() will create the destination file
150+
to the temporary file containing its data. Calling from_chunks() will create the destination file
115151
and remove the temporary files of the chunks
116152
"""
117153

118-
def __init__(self, dest_file, downloaded_items, size_check=True):
154+
def __init__(self, dest_file, downloaded_items: typing.List[DownloadQueueItem], size_check=True):
119155
self.dest_file = dest_file # full path to the destination file to be created
120156
self.downloaded_items = downloaded_items # list of pieces of the destination file to be merged
121157
self.size_check = size_check # whether we want to do merged file size check
122158

123-
def merge(self):
159+
def from_chunks(self):
160+
"""Merges downloaded chunks into a single file at dest_file path"""
161+
file_dir = os.path.dirname(self.dest_file)
162+
os.makedirs(file_dir, exist_ok=True)
163+
124164
with open(self.dest_file, "wb") as final:
125165
for item in self.downloaded_items:
126166
with open(item.download_file_path, "rb") as chunk:
@@ -142,8 +182,8 @@ def get_download_items(
142182
download_directory: str,
143183
download_path: Optional[str] = None,
144184
diff_only=False,
145-
):
146-
"""Returns an array of download queue items"""
185+
) -> List[DownloadQueueItem]:
186+
"""Returns an array of download queue items (chunks) for the given file"""
147187

148188
file_dir = os.path.dirname(os.path.normpath(os.path.join(download_directory, file_path)))
149189
basename = os.path.basename(download_path) if download_path else os.path.basename(file_path)
@@ -158,7 +198,7 @@ def get_download_items(
158198
return items
159199

160200

161-
def _do_download(item, mc, mp, project_path, job):
201+
def _do_download(item: typing.Union[DownloadQueueItem, DownloadDiffQueueItem], mc, mp, project_path, job):
162202
"""runs in worker thread"""
163203
if job.is_cancelled:
164204
return
@@ -341,7 +381,13 @@ class UpdateTask:
341381
"""
342382

343383
# TODO: methods other than COPY
344-
def __init__(self, file_path, download_queue_items, destination_file=None, latest_version=True):
384+
def __init__(
385+
self,
386+
file_path,
387+
download_queue_items: typing.List[DownloadQueueItem],
388+
destination_file=None,
389+
latest_version=True,
390+
):
345391
self.file_path = file_path
346392
self.destination_file = destination_file
347393
self.download_queue_items = download_queue_items
@@ -362,85 +408,48 @@ def apply(self, directory, mp):
362408
# ignore check if we download not-latest version of gpkg file (possibly reconstructed on server on demand)
363409
check_size = self.latest_version or not mp.is_versioned_file(self.file_path)
364410
# merge chunks together (and delete them afterwards)
365-
file_to_merge = FileToMerge(dest_file_path, self.download_queue_items, check_size)
366-
file_to_merge.merge()
411+
file_to_merge = DownloadFile(dest_file_path, self.download_queue_items, check_size)
412+
file_to_merge.from_chunks()
367413

368414
# Make a copy of the file to meta dir only if there is no user-specified path for the file.
369415
# destination_file is None for full project download and takes a meaningful value for a single file download.
370416
if mp.is_versioned_file(self.file_path) and self.destination_file is None:
371417
mp.geodiff.make_copy_sqlite(mp.fpath(self.file_path), mp.fpath_meta(self.file_path))
372418

373419

374-
class DownloadQueueItem:
375-
"""a piece of data from a project that should be downloaded - it can be either a chunk or it can be a diff"""
376-
377-
def __init__(self, file_path, size, version, diff_only, part_index, download_file_path):
378-
self.file_path = file_path # relative path to the file within project
379-
self.size = size # size of the item in bytes
380-
self.version = version # version of the file ("v123")
381-
self.diff_only = diff_only # whether downloading diff or full version
382-
self.part_index = part_index # index of the chunk
383-
self.download_file_path = download_file_path # full path to a temporary file which will receive the content
384-
385-
def __repr__(self):
386-
return "<DownloadQueueItem path={} version={} diff_only={} part_index={} size={} dest={}>".format(
387-
self.file_path, self.version, self.diff_only, self.part_index, self.size, self.download_file_path
388-
)
389-
390-
def download_blocking(self, mc, mp, project_path):
391-
"""Starts download and only returns once the file has been fully downloaded and saved"""
392-
393-
mp.log.debug(
394-
f"Downloading {self.file_path} version={self.version} diff={self.diff_only} part={self.part_index}"
395-
)
396-
start = self.part_index * (1 + CHUNK_SIZE)
397-
resp = mc.get(
398-
"/v1/project/raw/{}".format(project_path),
399-
data={"file": self.file_path, "version": self.version, "diff": self.diff_only},
400-
headers={"Range": "bytes={}-{}".format(start, start + CHUNK_SIZE)},
401-
)
402-
if resp.status in [200, 206]:
403-
mp.log.debug(f"Download finished: {self.file_path}")
404-
save_to_file(resp, self.download_file_path)
405-
else:
406-
mp.log.error(f"Download failed: {self.file_path}")
407-
raise ClientError("Failed to download part {} of file {}".format(self.part_index, self.file_path))
408-
409-
410420
class PullJob:
411421
def __init__(
412422
self,
413423
project_path,
414424
pull_actions,
415425
total_size,
416426
version,
417-
files_to_merge,
427+
download_files: List[DownloadFile],
418428
download_queue_items,
419429
tmp_dir,
420430
mp,
421431
project_info,
422432
basefiles_to_patch,
423433
mc,
434+
v2_pull_enabled=False,
424435
):
425436
self.project_path = project_path
426-
self.pull_actions: Optional[List[PullAction]] = (
427-
pull_actions # dictionary with changes (dict[str, list[dict]] - keys: "added", "updated", ...)
428-
)
437+
self.pull_actions: Optional[List[PullAction]] = pull_actions
429438
self.total_size = total_size # size of data to download (in bytes)
430439
self.transferred_size = 0
431440
self.version = version
432-
self.files_to_merge = files_to_merge # list of FileToMerge instances
441+
self.download_files = download_files # list of DownloadFile instances
433442
self.download_queue_items = download_queue_items
434443
self.tmp_dir = tmp_dir # TemporaryDirectory instance where we store downloaded files
435-
self.mp: MerginProject = mp # MerginProject instance
444+
self.mp: MerginProject = mp
436445
self.is_cancelled = False
437446
self.project_info = project_info # parsed JSON with project info returned from the server
438447
self.basefiles_to_patch = (
439448
basefiles_to_patch # list of tuples (relative path within project, list of diff files in temp dir to apply)
440449
)
441450
self.mc = mc
442451
self.futures = [] # list of concurrent.futures.Future instances
443-
self.v2_pull = mc.server_features().get("v2_pull_enabled", False)
452+
self.v2_pull_enabled = v2_pull_enabled # whether v2 pull mechanism is used
444453

445454
def dump(self):
446455
print("--- JOB ---", self.total_size, "bytes")
@@ -455,27 +464,16 @@ def dump(self):
455464
print("--- END ---")
456465

457466

458-
def prepare_file_destination(target_dir: str, path: str) -> str:
459-
"""Prepares destination path for downloaded files chunks"""
460-
461-
# figure out destination path for the file
462-
file_dir = os.path.dirname(os.path.normpath(os.path.join(target_dir, path)))
463-
basename = os.path.basename(path)
464-
dest_file_path = os.path.join(file_dir, basename)
465-
os.makedirs(file_dir, exist_ok=True)
466-
return dest_file_path
467-
468-
469-
def get_diff_merge_files(delta_item: ProjectDeltaItem, target_dir: str) -> List[FileToMerge]:
467+
def get_download_diff_files(delta_item: ProjectDeltaItem, target_dir: str) -> List[DownloadFile]:
470468
"""
471-
Extracts list of diff files to be downloaded from delta item using v1 endpoint.
469+
Extracts list of diff files to be downloaded from delta item using v1 endpoint per chunk.
472470
"""
473471
result = []
474472

475473
for diff in delta_item.diffs:
476-
dest_file_path = prepare_file_destination(target_dir, diff.id)
474+
dest_file_path = os.path.normpath(os.path.join(target_dir, diff.id))
477475
download_items = get_download_items(delta_item.path, diff.size, diff.version, target_dir, diff.id, True)
478-
result.append(FileToMerge(dest_file_path, download_items))
476+
result.append(DownloadFile(dest_file_path, download_items))
479477
return result
480478

481479

@@ -504,9 +502,10 @@ def pull_project_async(mc, directory) -> Optional[PullJob]:
504502
delta = None
505503
server_info = None
506504
server_version = None
507-
v2_pull = mc.server_features().get("v2_pull_enabled", False)
505+
v2_pull_enabled = mc.server_features().get("v2_pull_enabled", False)
508506
try:
509-
if v2_pull:
507+
if v2_pull_enabled:
508+
mp.log.info("Using v2 pull delta endpoint")
510509
delta: ProjectDelta = mc.get_project_delta(project_id, since=local_version)
511510
server_version = delta.to_version
512511
else:
@@ -528,8 +527,8 @@ def pull_project_async(mc, directory) -> Optional[PullJob]:
528527
mp.log.info(f"got project versions: local version {local_version} / server version {server_version}")
529528

530529
tmp_dir = tempfile.TemporaryDirectory(prefix="mm-pull-")
531-
# list of FileToMerge instances, which consists of destination path and list of download items
532-
merge_files = []
530+
# list of DownloadFile instances, which consists of destination path and list of download items
531+
download_files = []
533532
diff_files = []
534533
basefiles_to_patch = [] # list of tuples (relative path within project, list of diff files in temp dir to apply)
535534
pull_actions = []
@@ -557,15 +556,17 @@ def pull_project_async(mc, directory) -> Optional[PullJob]:
557556
mp.log.info(f"missing base file for {item.path} -> going to download it (version {server_version})")
558557
items = get_download_items(item.path, item.size, server_version, tmp_dir.name)
559558
dest_file_path = mp.fpath(item.path, tmp_dir.name)
560-
merge_files.append(FileToMerge(dest_file_path, items))
559+
download_files.append(DownloadFile(dest_file_path, items))
561560

562-
# Force use COPY action to apply the new version instead of trying to apply diffs
561+
# Force use COPY_CONFLICT action to apply the new version instead of trying to apply diffs
563562
# We are not able to get local changes anyway as base file is missing
564563
pull_action.type = PullActionType.COPY_CONFLICT
564+
continue
565565

566566
# if we have diff to apply, let's download the diff files
567567
# if we have conflict and diff update, download the diff files
568-
elif v2_pull:
568+
if v2_pull_enabled:
569+
# using v2 endpoint to download diff files, without chunks. Then we are creating DownloadDiffQueueItem instances for each diff file.
569570
diff_files.extend(
570571
[
571572
DownloadDiffQueueItem(diff_item.id, os.path.join(tmp_dir.name, diff_item.id))
@@ -576,27 +577,28 @@ def pull_project_async(mc, directory) -> Optional[PullJob]:
576577

577578
else:
578579
# fallback for diff files using v1 endpoint /raw
579-
diff_merge_files = get_diff_merge_files(item, tmp_dir.name)
580-
merge_files.extend(diff_merge_files)
580+
# download chunks and create DownloadFile instances for each diff file
581+
diff_download_files = get_download_diff_files(item, tmp_dir.name)
582+
download_files.extend(diff_download_files)
581583
basefiles_to_patch.append((item.path, [diff.id for diff in item.diffs]))
582584

583-
# let's check the base file existence
584585
elif pull_action_type == PullActionType.COPY or pull_action_type == PullActionType.COPY_CONFLICT:
585586
# simply download the server version of the files
586-
dest_file_path = prepare_file_destination(tmp_dir.name, item.path)
587+
dest_file_path = os.path.normpath(os.path.join(tmp_dir.name, item.path))
587588
download_items = get_download_items(item.path, item.size, server_version, tmp_dir.name)
588-
merge_files.append(FileToMerge(dest_file_path, download_items))
589+
download_files.append(DownloadFile(dest_file_path, download_items))
589590

590591
pull_actions.append(pull_action)
591592
# Do nothing for DELETE actions
592593

593594
# make a single list of items to download
594595
total_size = 0
595596
download_queue_items = []
597+
# Diff files downloaded without chunks (downloaded as a whole, do need to be merged)
596598
for diff_file in diff_files:
597599
download_queue_items.append(diff_file)
598600
total_size += diff_file.size
599-
for file_to_merge in merge_files:
601+
for file_to_merge in download_files:
600602
download_queue_items.extend(file_to_merge.downloaded_items)
601603
for item in file_to_merge.downloaded_items:
602604
total_size += item.size
@@ -608,13 +610,14 @@ def pull_project_async(mc, directory) -> Optional[PullJob]:
608610
pull_actions,
609611
total_size,
610612
server_version,
611-
merge_files,
613+
download_files,
612614
download_queue_items,
613615
tmp_dir,
614616
mp,
615617
server_info,
616618
basefiles_to_patch,
617619
mc,
620+
v2_pull_enabled,
618621
)
619622

620623
# start download
@@ -683,19 +686,28 @@ def pull_project_finalize(job: PullJob):
683686
raise future.exception()
684687

685688
job.mp.log.info("finalizing pull")
686-
if not job.project_info and job.v2_pull:
687-
project_info_response = job.mc.project_info(job.project_path, version=job.version)
688-
job.project_info = asdict(project_info_response)
689+
try:
690+
if not job.project_info and job.v2_pull_enabled:
691+
project_info_response = job.mc.project_info_v2(job.mp.project_id(), files_at_version=job.version)
692+
job.project_info = asdict(project_info_response)
693+
except NotImplementedError as e:
694+
job.mp.log.error("Failed to get project info v2 in this server version: " + str(e))
695+
job.mp.log.info("--- pull aborted")
696+
raise ClientError("Failed to get project info v2 as server not support it: " + str(e))
697+
except ClientError as e:
698+
job.mp.log.error("Failed to get project info v2: " + str(e))
699+
job.mp.log.info("--- pull aborted")
700+
raise
689701

690702
if not job.project_info:
691703
job.mp.log.error("No project info available to finalize pull")
692704
job.mp.log.info("--- pull aborted")
693705
raise ClientError("No project info available to finalize pull")
694706

695-
# merge downloaded chunks
707+
# create files from downloaded chunks
696708
try:
697-
for file_to_merge in job.files_to_merge:
698-
file_to_merge.merge()
709+
for download_file in job.download_files:
710+
download_file.from_chunks()
699711
except ClientError as err:
700712
job.mp.log.error("Error merging chunks of downloaded file: " + str(err))
701713
job.mp.log.info("--- pull aborted")
@@ -817,7 +829,7 @@ def download_diffs_async(mc, project_directory, file_path, versions):
817829
dest_file_path = mp.fpath_cache(diff["path"], version=file["version"])
818830
if os.path.exists(dest_file_path):
819831
continue
820-
files_to_merge.append(FileToMerge(dest_file_path, items))
832+
files_to_merge.append(DownloadFile(dest_file_path, items))
821833
download_list.extend(items)
822834
for item in items:
823835
total_size += item.size

0 commit comments

Comments
 (0)