Added handling of gdrive resource_keys - fixes #16

This commit is contained in:
Tris Forster 2026-05-27 22:46:08 +10:00
parent e46d8145a7
commit 5468f6d3e7
2 changed files with 72 additions and 43 deletions

View File

@ -7,7 +7,10 @@ logger = logging.getLogger(__name__)
def sync_work(work: Work): def sync_work(work: Work):
folder_id = work.meta_info.get(name="folderid").value try:
folder_id = work.meta_info.get(name="folderid").value
except WorkMeta.DoesNotExist as err:
raise IndexError("Work not currently linked to a gdrive folder") from err
logger.info("Syncing '%s' from %r", work.name, folder_id) logger.info("Syncing '%s' from %r", work.name, folder_id)
@ -15,7 +18,7 @@ def sync_work(work: Work):
existing = set( existing = set(
[ [
storage.parse_id(x.partition(":")[2]) storage.parse_resource(x.partition(":")[2]).id
for x in work.docs.values_list("upload", flat=True) for x in work.docs.values_list("upload", flat=True)
] ]
) )
@ -42,16 +45,19 @@ def sync_work(work: Work):
logger.warning("Local entry not in folder: %s", uri) logger.warning("Local entry not in folder: %s", uri)
def sync_partial_collection(collection: Collection, sync_existing: bool = True):
works = Work.objects.filter(collection=collection, meta_info__name="folderid")
for work in works:
sync_work(work)
def sync_collection(collection: Collection, sync_existing: bool = False): def sync_collection(collection: Collection, sync_existing: bool = False):
logger.info("Syncing '%s'", collection) logger.info("Syncing '%s'", collection)
if not collection.storage.storage.endswith("GDriveLinkStorage"): if not collection.storage.storage.endswith("GDriveLinkStorage"):
raise RuntimeError("Not a gdrive storage") return sync_partial_collection(collection, sync_existing)
try:
folder_id = collection.settings["folder_id"]
except KeyError:
raise KeyError("Missing 'folder_id' in settings")
existing = dict( existing = dict(
WorkMeta.objects.filter( WorkMeta.objects.filter(
@ -59,8 +65,9 @@ def sync_collection(collection: Collection, sync_existing: bool = False):
).values_list("value", "work_id") ).values_list("value", "work_id")
) )
folder = collection.prefix
storage = collection.storage.instance() storage = collection.storage.instance()
folders, _ = storage.listdir(folder_id) folders, _ = storage.listdir(folder)
for folder in folders: for folder in folders:
if folder[0] == "_": if folder[0] == "_":

View File

@ -8,19 +8,26 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SHARED_FOLDER = re.compile(r"https://drive.google.com/drive[u0-9\/]+folders/([\w\-]+)") SHARED_FOLDER = re.compile(
SHARED_FILE = re.compile(r"https://drive.google.com/file/d/([\w\-]+)") r"https://drive.google.com/drive[u0-9\/]+folders/([\w\-]+)(\?resourcekey=([\w\-]+))?"
)
SHARED_FILE = re.compile(
r"https://drive.google.com/file/d/([\w\-]+)(\?resourcekey=([\w\-]+))?"
)
FILES_API = "https://www.googleapis.com/drive/v3/files" FILES_API = "https://www.googleapis.com/drive/v3/files"
class DriveObject(namedtuple("DriveObject", ("id", "name"))): class DriveObject(namedtuple("DriveObject", ("id", "key", "name"))):
@classmethod @classmethod
def from_string(cls, s): def from_string(cls, s: str):
return cls(*s.split("/", 1)) resource, _, name = s.partition("/")
id, _, key = resource.partition("#")
return cls(id, key, name)
def __str__(self): def __str__(self):
return f"{self.id}/{self.name}" resource = f"{self.id}#{self.key}" if self.key else self.id
return f"{resource}/{self.name}"
class GDriveLinkStorage(Storage): class GDriveLinkStorage(Storage):
@ -30,21 +37,28 @@ class GDriveLinkStorage(Storage):
self.api_key = api_key self.api_key = api_key
super().__init__() super().__init__()
def parse_id(self, name): def parse_resource(self, name) -> DriveObject:
parts = name.split("/") return DriveObject.from_string(name)
return parts[0]
def extract_id(self, url, *patterns): def extract_resource(self, url, *patterns) -> DriveObject:
logger.debug("EXTRACT_ID: %r", url) logger.debug("EXTRACT_RESOURCE: %r", url)
for pattern in patterns: for pattern in patterns:
match = pattern.match(url) match = pattern.match(url)
if match: if match:
return match.groups()[0] groups = match.groups()
logger.debug(groups)
if len(groups) == 3:
return DriveObject(groups[0], groups[2], "")
return DriveObject(groups[0], None, "")
raise FileNotFoundError(f"Not a valid url: {url}") raise FileNotFoundError(f"Not a valid url: {url}")
def get_json(self, url): def get_json(self, url, resource: DriveObject):
logger.debug("GET_JSON: %s", url) headers = {}
response = requests.get(url) if resource.key:
headers["X-Goog-Drive-Resource-Keys"] = f"{resource.id}/{resource.key}"
logger.debug("GET_JSON: %s %r", url, headers)
response = requests.get(url, headers=headers)
data = response.json() data = response.json()
logger.debug("Data: %r", data) logger.debug("Data: %r", data)
return data return data
@ -55,31 +69,36 @@ class GDriveLinkStorage(Storage):
if path == "": if path == "":
return [], [] return [], []
folder_id = self.parse_id(path) folder = self.parse_resource(path)
url = f"{FILES_API}?q='{folder_id}'+in+parents&key={self.api_key}" url = f"{FILES_API}?q='{folder.id}'+in+parents&key={self.api_key}"
data = self.get_json(url) data = self.get_json(url, folder)
files = [] files = []
folders = [] folders = []
for x in data["files"]: for x in data["files"]:
if x["mimeType"] == "application/vnd.google-apps.folder": if x["mimeType"] == "application/vnd.google-apps.folder":
# folders.append(f"{x['id']}/{x['name']}") # folders.append(f"{x['id']}/{x['name']}")
folders.append(DriveObject(x["id"], x["name"])) folders.append(DriveObject(x["id"], x.get("resourceKey"), x["name"]))
else: else:
# files.append(f"{x['id']}/{x['name']}") # files.append(f"{x['id']}/{x['name']}")
files.append(DriveObject(x["id"], x["name"])) files.append(DriveObject(x["id"], x.get("resourceKey"), x["name"]))
return folders, files return folders, files
def get_meta(self, name): def get_meta(self, name):
file_id = self.parse_id(name) file_resource = self.parse_resource(name)
url = f"{FILES_API}/{file_id}?key={self.api_key}" url = f"{FILES_API}/{file_resource.id}?key={self.api_key}"
return self.get_json(url) return self.get_json(url, file_resource)
def open(self, name, mode="rb"): def open(self, name, mode="rb"):
file_id = self.parse_id(name) resource = self.parse_resource(name)
url = f"{FILES_API}/{file_id}?alt=media&key={self.api_key}" url = f"{FILES_API}/{resource.id}?alt=media&key={self.api_key}"
logger.info("URL: %s", url)
response = requests.get(url, stream=True) headers = {}
if resource.key:
headers["X-Goog-Drive-Resource-Keys"] = f"{resource.id}/{resource.key}"
logger.info("URL: %s [%r]", url, headers)
response = requests.get(url, headers=headers, stream=True)
return GzipFile(name, "rb", 9, response.raw) return GzipFile(name, "rb", 9, response.raw)
def size(self, name): def size(self, name):
@ -90,25 +109,28 @@ class GDriveLinkStorage(Storage):
def url(self, name): def url(self, name):
logger.debug("URL: %r", name) logger.debug("URL: %r", name)
file_id = self.parse_id(name) resource = self.parse_resource(name)
return f"https://drive.usercontent.google.com/download?export=download&id={file_id}&confirm=yes" uri = f"https://drive.usercontent.google.com/download?export=download&id={resource.id}&confirm=yes"
if resource.key:
uri += f"&resourcekey=${resource.key}"
return uri
def get_folder_id(self, url): def get_folder_id(self, url):
try: try:
return self.extract_id(url, SHARED_FOLDER) return self.extract_resource(url, SHARED_FOLDER)
except FileNotFoundError: except FileNotFoundError:
return None return None
def get_file_id(self, url): def get_file_id(self, url):
try: try:
return self.extract_id(url, SHARED_FILE) return self.extract_resource(url, SHARED_FILE)
except FileNotFoundError: except FileNotFoundError:
return None return None
def import_link(self, url) -> str: def import_link(self, url) -> str:
file_id = self.extract_id(url, SHARED_FILE) file_resource = self.extract_resource(url, SHARED_FILE)
meta = self.get_meta(file_id) meta = self.get_meta(file_resource)
return f"{file_id}/{meta['name']}" return f"{file_resource}/{meta['name']}"
""" """