148 lines
4.7 KiB
Python
148 lines
4.7 KiB
Python
from django.core.files.storage import Storage
|
|
from collections import namedtuple
|
|
import requests
|
|
import re
|
|
from gzip import GzipFile
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SHARED_FOLDER = re.compile(
|
|
r"https://drive.google.com/drive[u0-9\/]+folders/([\w\-]+)(\?resourcekey=([\w\-]+))?"
|
|
)
|
|
SHARED_FILE = re.compile(
|
|
r"https://drive.google.com/file/d/([\w\-]+)(\?resourcekey=([\w\-]+))?"
|
|
)
|
|
|
|
FILES_API = "https://www.googleapis.com/drive/v3/files"
|
|
|
|
|
|
class DriveObject(namedtuple("DriveObject", ("id", "key", "name"))):
|
|
@classmethod
|
|
def from_string(cls, s: str):
|
|
resource, _, name = s.partition("/")
|
|
id, _, key = resource.partition("#")
|
|
return cls(id, key, name)
|
|
|
|
def __str__(self):
|
|
resource = f"{self.id}#{self.key}" if self.key else self.id
|
|
return f"{resource}/{self.name}"
|
|
|
|
|
|
class GDriveLinkStorage(Storage):
|
|
is_writable = False
|
|
|
|
def __init__(self, api_key):
|
|
self.api_key = api_key
|
|
super().__init__()
|
|
|
|
def parse_resource(self, name) -> DriveObject:
|
|
return DriveObject.from_string(name)
|
|
|
|
def extract_resource(self, url, *patterns) -> DriveObject:
|
|
logger.debug("EXTRACT_RESOURCE: %r", url)
|
|
for pattern in patterns:
|
|
match = pattern.match(url)
|
|
if match:
|
|
groups = match.groups()
|
|
logger.debug(groups)
|
|
if len(groups) == 3:
|
|
return DriveObject(groups[0], groups[2], "")
|
|
return DriveObject(groups[0], None, "")
|
|
raise FileNotFoundError(f"Not a valid url: {url}")
|
|
|
|
def get_json(self, url, resource: DriveObject):
|
|
headers = {}
|
|
if resource.key:
|
|
headers["X-Goog-Drive-Resource-Keys"] = f"{resource.id}/{resource.key}"
|
|
|
|
logger.debug("GET_JSON: %s %r", url, headers)
|
|
response = requests.get(url, headers=headers)
|
|
data = response.json()
|
|
logger.debug("Data: %r", data)
|
|
return data
|
|
|
|
def listdir(self, path) -> tuple[list[str], list[str]]:
|
|
# used to test for valid connection parameters - should do something to validate API key here
|
|
logger.debug("listdir: %s", path)
|
|
if path == "":
|
|
return [], []
|
|
|
|
files = []
|
|
folders = []
|
|
|
|
folder = self.parse_resource(path)
|
|
url = f"{FILES_API}?q='{folder.id}'+in+parents&key={self.api_key}"
|
|
|
|
data = self.get_json(url, folder)
|
|
while True:
|
|
for x in data["files"]:
|
|
if x["mimeType"] == "application/vnd.google-apps.folder":
|
|
folders.append(
|
|
DriveObject(x["id"], x.get("resourceKey"), x["name"])
|
|
)
|
|
else:
|
|
files.append(DriveObject(x["id"], x.get("resourceKey"), x["name"]))
|
|
|
|
token = data.get("nextPageToken")
|
|
if token is None:
|
|
return folders, files
|
|
data = self.get_json(f"{url}&pageToken={token}", folder)
|
|
|
|
def get_meta(self, name):
|
|
file_resource = self.parse_resource(name)
|
|
url = f"{FILES_API}/{file_resource.id}?key={self.api_key}"
|
|
return self.get_json(url, file_resource)
|
|
|
|
def open(self, name, mode="rb"):
|
|
resource = self.parse_resource(name)
|
|
url = f"{FILES_API}/{resource.id}?alt=media&key={self.api_key}"
|
|
|
|
headers = {}
|
|
if resource.key:
|
|
headers["X-Goog-Drive-Resource-Keys"] = f"{resource.id}/{resource.key}"
|
|
logger.info("URL: %s [%r]", url, headers)
|
|
|
|
response = requests.get(url, headers=headers, stream=True)
|
|
return GzipFile(name, "rb", 9, response.raw)
|
|
|
|
def size(self, name):
|
|
raise NotImplementedError()
|
|
|
|
def delete(self, name):
|
|
pass
|
|
|
|
def url(self, name):
|
|
logger.debug("URL: %r", name)
|
|
resource = self.parse_resource(name)
|
|
uri = f"https://drive.usercontent.google.com/download?export=download&id={resource.id}&confirm=yes"
|
|
if resource.key:
|
|
uri += f"&resourcekey=${resource.key}"
|
|
return uri
|
|
|
|
def get_folder_id(self, url):
|
|
try:
|
|
return self.extract_resource(url, SHARED_FOLDER)
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
def get_file_id(self, url):
|
|
try:
|
|
return self.extract_resource(url, SHARED_FILE)
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
def import_link(self, url) -> str:
|
|
file_resource = self.extract_resource(url, SHARED_FILE)
|
|
meta = self.get_meta(file_resource)
|
|
return f"{file_resource}/{meta['name']}"
|
|
|
|
|
|
"""
|
|
def folder_import(self, url) -> list[str]:
|
|
folder_id = self.extract_id(url, SHARED_FOLDER)
|
|
_, files = self.listdir(folder_id)
|
|
return files
|
|
"""
|