from django.core.files.storage import Storage from collections import namedtuple import requests import re from gzip import GzipFile import logging logger = logging.getLogger(__name__) SHARED_FOLDER = re.compile(r"https://drive.google.com/drive/folders/(\w+)") SHARED_FILE = re.compile(r"https://drive.google.com/file/d/([\w\-]+)") FILES_API = "https://www.googleapis.com/drive/v3/files" class DriveObject(namedtuple("DriveObject", ("id", "name"))): @classmethod def from_string(cls, s): return cls(*s.split("/", 1)) def __str__(self): return f"{self.id}/{self.name}" class GDriveLinkStorage(Storage): is_writable = False def __init__(self, api_key): self.api_key = api_key super().__init__() def parse_id(self, name): parts = name.split("/") return parts[0] def extract_id(self, url, *patterns): logger.debug("EXTRACT_ID: %r", url) for pattern in patterns: match = pattern.match(url) if match: return match.groups()[0] raise FileNotFoundError(f"Not a valid url: {url}") def get_json(self, url): logger.debug("GET_JSON: %s", url) response = requests.get(url) data = response.json() logger.debug("Data: %r", data) return data def listdir(self, path) -> tuple[list[str], list[str]]: # used to test for valid connection parameters - should do something to validate API key here if path == "": return [], [] logger.debug("LISTDIR: %s", path) folder_id = self.parse_id(path) url = f"{FILES_API}?q='{folder_id}'+in+parents&key={self.api_key}" data = self.get_json(url) files = [] folders = [] for x in data["files"]: if x["mimeType"] == "application/vnd.google-apps.folder": # folders.append(f"{x['id']}/{x['name']}") folders.append(DriveObject(x["id"], x["name"])) else: # files.append(f"{x['id']}/{x['name']}") files.append(DriveObject(x["id"], x["name"])) return folders, files def get_meta(self, name): file_id = self.parse_id(name) url = f"{FILES_API}/{file_id}?key={self.api_key}" return self.get_json(url) def open(self, name, mode="rb"): file_id = self.parse_id(name) url = f"{FILES_API}/{file_id}?alt=media&key={self.api_key}" logger.info("URL: %s", url) response = requests.get(url, stream=True) return GzipFile(name, "rb", 9, response.raw) def size(self, name): raise NotImplementedError() def delete(self, name): pass def url(self, name): logger.debug("URL: %r", name) file_id = self.parse_id(name) return f"https://drive.usercontent.google.com/download?export=download&id={file_id}&confirm=yes" def get_folder_id(self, url): try: return self.extract_id(url, SHARED_FOLDER) except FileNotFoundError: return None def get_file_id(self, url): try: return self.extract_id(url, SHARED_FILE) except FileNotFoundError: return None """ def import_link(self, url) -> str: file_id = self.extract_id(url, SHARED_FILE) meta = self.get_meta(file_id) return f"{file_id}/{meta['name']}" def folder_import(self, url) -> list[str]: folder_id = self.extract_id(url, SHARED_FOLDER) _, files = self.listdir(folder_id) return files """