2026-05-13 09:32:22 +10:00

121 lines
3.5 KiB
Python

from django.core.files.storage import Storage
from collections import namedtuple
import requests
import re
from gzip import GzipFile
import logging
logger = logging.getLogger(__name__)
SHARED_FOLDER = re.compile(r"https://drive.google.com/drive/folders/(\w+)")
SHARED_FILE = re.compile(r"https://drive.google.com/file/d/([\w\-]+)")
FILES_API = "https://www.googleapis.com/drive/v3/files"
class DriveObject(namedtuple("DriveObject", ("id", "name"))):
@classmethod
def from_string(cls, s):
return cls(*s.split("/", 1))
def __str__(self):
return f"{self.id}/{self.name}"
class GDriveLinkStorage(Storage):
is_writable = False
def __init__(self, api_key):
self.api_key = api_key
super().__init__()
def parse_id(self, name):
parts = name.split("/")
return parts[0]
def extract_id(self, url, *patterns):
logger.debug("EXTRACT_ID: %r", url)
for pattern in patterns:
match = pattern.match(url)
if match:
return match.groups()[0]
raise FileNotFoundError(f"Not a valid url: {url}")
def get_json(self, url):
logger.debug("GET_JSON: %s", url)
response = requests.get(url)
data = response.json()
logger.debug("Data: %r", data)
return data
def listdir(self, path) -> tuple[list[str], list[str]]:
# used to test for valid connection parameters - should do something to validate API key here
if path == "":
return [], []
logger.debug("LISTDIR: %s", path)
folder_id = self.parse_id(path)
url = f"{FILES_API}?q='{folder_id}'+in+parents&key={self.api_key}"
data = self.get_json(url)
files = []
folders = []
for x in data["files"]:
if x["mimeType"] == "application/vnd.google-apps.folder":
# folders.append(f"{x['id']}/{x['name']}")
folders.append(DriveObject(x["id"], x["name"]))
else:
# files.append(f"{x['id']}/{x['name']}")
files.append(DriveObject(x["id"], x["name"]))
return folders, files
def get_meta(self, name):
file_id = self.parse_id(name)
url = f"{FILES_API}/{file_id}?key={self.api_key}"
return self.get_json(url)
def open(self, name, mode="rb"):
file_id = self.parse_id(name)
url = f"{FILES_API}/{file_id}?alt=media&key={self.api_key}"
logger.info("URL: %s", url)
response = requests.get(url, stream=True)
return GzipFile(name, "rb", 9, response.raw)
def size(self, name):
raise NotImplementedError()
def delete(self, name):
pass
def url(self, name):
logger.debug("URL: %r", name)
file_id = self.parse_id(name)
return f"https://drive.usercontent.google.com/download?export=download&id={file_id}&confirm=yes"
def get_folder_id(self, url):
try:
return self.extract_id(url, SHARED_FOLDER)
except FileNotFoundError:
return None
def get_file_id(self, url):
try:
return self.extract_id(url, SHARED_FILE)
except FileNotFoundError:
return None
"""
def import_link(self, url) -> str:
file_id = self.extract_id(url, SHARED_FILE)
meta = self.get_meta(file_id)
return f"{file_id}/{meta['name']}"
def folder_import(self, url) -> list[str]:
folder_id = self.extract_id(url, SHARED_FOLDER)
_, files = self.listdir(folder_id)
return files
"""