Compare commits

..

2 Commits

Author SHA1 Message Date
b1ea75cec0 Merge pull request 'GdriveFolder code' (#12) from gdrive into master
Reviewed-on: #12
2026-05-13 09:35:48 +10:00
7d041e1fd0 GdriveFolder code 2026-05-13 09:32:22 +10:00
11 changed files with 414 additions and 15 deletions

View File

@ -44,3 +44,11 @@ class ProjectEnsembleChoiceField(forms.ModelChoiceField):
class ProjectSelectForm(BaseForm):
project = ProjectEnsembleChoiceField(queryset=Project.objects.all())
class DocumentLinkForm(BaseForm):
link = forms.URLField(help_text="Paste the direct link relevant to this storage")
class DocumentBulkForm(BaseForm):
folder_link = forms.URLField(help_text="Paste the folder link for this storage")

View File

@ -0,0 +1,75 @@
from library.models import Collection, Work, WorkMeta, Document
import logging
logger = logging.getLogger(__name__)
def sync_work(work: Work):
logger.info("Syncing '%s'", work.name)
folder_id = work.meta_info.get(name="folderid").value
storage = work.collection.storage.instance()
prefix = work.collection.storage.name
_, files = storage.listdir(folder_id)
existing = set(
[
storage.parse_id(x.partition(":")[2])
for x in work.docs.values_list("upload", flat=True)
]
)
logger.debug("%d existing documents", len(existing))
for file in files:
if file.id in existing:
logger.debug("%30s: Skipping existing (%s)", file.name, file.id)
existing.discard(file.id)
continue
if not file.name.lower().endswith(".pdf"):
logger.debug("%40s: Not a PDF", file.name)
continue
logger.info("%40s: Adding", file.name)
doc = work.docs.create(upload=f"{prefix}:{file}", doctype=Document.DOCTYPE_PDF)
doc.auto_tag()
for uri in existing:
logger.warning("Local entry not in folder: %s", uri)
def sync_collection(collection: Collection, sync_existing: bool = False):
logger.info("Syncing '%s'", collection)
if not collection.storage.storage.endswith("GDriveLinkStorage"):
raise RuntimeError("Not a gdrive storage")
if not collection.prefix:
raise KeyError("Prefix must store folder id")
existing = dict(
WorkMeta.objects.filter(
work__collection=collection, name="folderid"
).values_list("value", "work_id")
)
storage = collection.storage.instance()
folders, _ = storage.listdir(collection.prefix)
for folder in folders:
if folder.id in existing:
if sync_existing:
logger.info("%40s: Syncing (%s)", folder.name, folder.id[:12])
sync_work(Work.objects.get(pk=existing[folder.id]))
del existing[folder.id]
continue
logger.info("%40s: Adding", folder.name)
work = Work(name=folder.name, collection=collection)
work.save()
work.meta_info.create(name="folderid", value=folder.id)
sync_work(work)
for folderid, work in existing:
logger.warning("Folder for work %d no longer in drive (%s)", work, folderid)

View File

@ -0,0 +1,120 @@
from django.core.files.storage import Storage
from collections import namedtuple
import requests
import re
from gzip import GzipFile
import logging
logger = logging.getLogger(__name__)
SHARED_FOLDER = re.compile(r"https://drive.google.com/drive/folders/(\w+)")
SHARED_FILE = re.compile(r"https://drive.google.com/file/d/([\w\-]+)")
FILES_API = "https://www.googleapis.com/drive/v3/files"
class DriveObject(namedtuple("DriveObject", ("id", "name"))):
@classmethod
def from_string(cls, s):
return cls(*s.split("/", 1))
def __str__(self):
return f"{self.id}/{self.name}"
class GDriveLinkStorage(Storage):
is_writable = False
def __init__(self, api_key):
self.api_key = api_key
super().__init__()
def parse_id(self, name):
parts = name.split("/")
return parts[0]
def extract_id(self, url, *patterns):
logger.debug("EXTRACT_ID: %r", url)
for pattern in patterns:
match = pattern.match(url)
if match:
return match.groups()[0]
raise FileNotFoundError(f"Not a valid url: {url}")
def get_json(self, url):
logger.debug("GET_JSON: %s", url)
response = requests.get(url)
data = response.json()
logger.debug("Data: %r", data)
return data
def listdir(self, path) -> tuple[list[str], list[str]]:
# used to test for valid connection parameters - should do something to validate API key here
if path == "":
return [], []
logger.debug("LISTDIR: %s", path)
folder_id = self.parse_id(path)
url = f"{FILES_API}?q='{folder_id}'+in+parents&key={self.api_key}"
data = self.get_json(url)
files = []
folders = []
for x in data["files"]:
if x["mimeType"] == "application/vnd.google-apps.folder":
# folders.append(f"{x['id']}/{x['name']}")
folders.append(DriveObject(x["id"], x["name"]))
else:
# files.append(f"{x['id']}/{x['name']}")
files.append(DriveObject(x["id"], x["name"]))
return folders, files
def get_meta(self, name):
file_id = self.parse_id(name)
url = f"{FILES_API}/{file_id}?key={self.api_key}"
return self.get_json(url)
def open(self, name, mode="rb"):
file_id = self.parse_id(name)
url = f"{FILES_API}/{file_id}?alt=media&key={self.api_key}"
logger.info("URL: %s", url)
response = requests.get(url, stream=True)
return GzipFile(name, "rb", 9, response.raw)
def size(self, name):
raise NotImplementedError()
def delete(self, name):
pass
def url(self, name):
logger.debug("URL: %r", name)
file_id = self.parse_id(name)
return f"https://drive.usercontent.google.com/download?export=download&id={file_id}&confirm=yes"
def get_folder_id(self, url):
try:
return self.extract_id(url, SHARED_FOLDER)
except FileNotFoundError:
return None
def get_file_id(self, url):
try:
return self.extract_id(url, SHARED_FILE)
except FileNotFoundError:
return None
"""
def import_link(self, url) -> str:
file_id = self.extract_id(url, SHARED_FILE)
meta = self.get_meta(file_id)
return f"{file_id}/{meta['name']}"
def folder_import(self, url) -> list[str]:
folder_id = self.extract_id(url, SHARED_FOLDER)
_, files = self.listdir(folder_id)
return files
"""

View File

@ -0,0 +1,60 @@
from django.shortcuts import resolve_url, redirect
from django.views.generic import FormView
from django.views.generic.detail import SingleObjectMixin
from library.views import CollectionMixin
from library.models import Work, Document
from library import forms
class WorkGDriveView(CollectionMixin, SingleObjectMixin, FormView):
model = Work
template_name = "library/gdrive.html"
form_class = forms.DocumentLinkForm
@property
def cancel_url(self):
return resolve_url("work_detail", self.collection.pk, self.kwargs["pk"])
def get_context_data(self, *args, **kwargs):
self.object = self.get_object()
data = super().get_context_data(*args, **kwargs)
data["meta"] = dict(self.object.meta_info.values_list("name", "value"))
print(data["meta"])
return data
def form_valid(self, form):
link = form.cleaned_data["link"]
storage = self.collection.storage.instance()
self.object = self.get_object()
try:
folderid = storage.get_folder_id(link)
self.object.meta_info.update_or_create(
name="folderid", defaults={"value": folderid}
)
return redirect("work_detail", self.collection.pk, self.kwargs["pk"])
except FileNotFoundError:
pass # not a folder id
try:
link = self.collection.storage.instance().import_link(link)
except AttributeError:
pass
except FileNotFoundError as e:
form.add_error("link", str(e))
return self.form_invalid(form)
work = self.collection.works.get(pk=self.kwargs["pk"])
doc = Document(
work=work,
upload=f"{self.collection.storage.name}:{link}",
doctype=Document.DOCTYPE_PDF,
)
doc.save()
doc.auto_tag()
return redirect("work_detail", self.collection.pk, self.kwargs["pk"])

View File

@ -0,0 +1,25 @@
from django.core.management.base import BaseCommand, CommandError
from library.models import Work, Collection
from library.gdrive import sync_work, sync_collection
class Command(BaseCommand):
help = "Synchronizes folders and works"
def add_arguments(self, parser):
parser.add_argument("type", type=str, choices=["work", "collection"])
parser.add_argument("pk", nargs="?", type=int)
def handle(self, *args, **options):
if options["type"] == "work":
work = Work.objects.get(pk=options["pk"])
sync_work(work)
return
if options["type"] == "collection":
collection = Collection.objects.get(pk=options["pk"])
sync_collection(collection)
return
raise CommandError("Unknown object type")

View File

@ -11,7 +11,7 @@ import os.path
from byostorage.user import BYOStorage
from byostorage.cached import CachedStorage
from library.music_tags import MusicTag
from library.music_tags import MusicTag, auto_tag
from interface.utils import sign_data
import logging
@ -383,6 +383,7 @@ class WorkMeta(models.Model):
("genre", "Genre"),
("style", "Style"),
("orchestration", "Orchestration"),
("folderid", "GDrive"),
)
work = models.ForeignKey(Work, on_delete=models.CASCADE, related_name="meta_info")
@ -427,6 +428,14 @@ class Document(models.Model):
created = models.DateTimeField(auto_now_add=True)
version = models.CharField(max_length=30, blank=True)
def auto_tag(self):
if self.doctype != self.DOCTYPE_PDF:
return
filename = os.path.basename(str(self.upload))
inst = auto_tag(filename)
if inst:
self.sections.get_or_create(tag=inst.abbreviate())
def delete(self, *args, **kwargs):
self.upload.delete(save=False)
return super().delete(*args, **kwargs)

View File

@ -233,7 +233,11 @@ class MusicTag(namedtuple("MusicTag", ("name", "variant"), defaults=[None])):
return self.name
PATTERNS = [re.compile(r"([A-Za-z]+)[_\- ]*(\d+)"), re.compile(r"([A-Za-z]+)()")]
PATTERNS = [
re.compile(r"(?P<inst>[A-Za-z]+)[_\- ]*(?P<ord>\d+)"),
re.compile(r"(?P<ord>\d+)(st|nd|rd|th)[_\- ]*(?P<inst>[A-Za-z]+)"),
re.compile(r"(?P<inst>[A-Za-z]+)()"),
]
def auto_tag(filename):
@ -249,12 +253,20 @@ def auto_tag(filename):
MusicTag(name='Viola', variant=None)
>>> auto_tag('Ode to Joy - fl-2 (piccolo).pdf')
MusicTag(name='Flute', variant=2)
>>> auto_tag('1st Violin - Ode to Joy.pdf')
MusicTag(name='Violin', variant=1)
>>> auto_tag('Ode to Joy - 2nd Violin.pdf')
MusicTag(name='Violin', variant=2)
"""
for pattern in PATTERNS:
for inst, ordinal in pattern.findall(filename):
inst = inst.lower()
ordinal = int(ordinal) if ordinal else None
for m in pattern.finditer(filename):
inst = m["inst"].lower()
try:
ordinal = int(m["ord"])
except IndexError:
ordinal = None
if inst in MUSIC_TAG_BY_NAME:
return MusicTag(inst.title(), ordinal)
if inst in MUSIC_NAME_BY_TAG:

View File

@ -0,0 +1,41 @@
{% extends "interface/project_base.html" %}
{% block admin %}
<a href="{% url 'work_detail' collection=collection.pk pk=object.pk %}" class="button is-link is-light">
<span>Back to work</span>
</a>
{% endblock %}
{% block page %}
<h3 class="subtitle"><a href="{% url 'work_detail' collection.pk object.pk %}">{{ object.name }}</a></h3>
<div class="m-3">
<p>This page lets you link a work to a google drive folder. You can either paste a public link to a folder to enable syncing or a file to add individually</p>
</div>
<div class="m-3">
<p>
{% if meta.folderid %}
This work is currently linked to <b>{{ meta.folderid }}</b>. Pasting a new folder link will overwrite this.
{% else %}
There is currently no shared drive folder linked to this work - paste one here to enable syncing.
{% endif %}
</p>
</div>
<div>
<form method="post">
<div class="field">
<div class="control">
<input name="link" class="input is-expanded" type="text" placeholder="Shared link">
</div>
{% for error in form.errors.link %}
<p class="help is-danger">{{ error }}</p>
{% endfor %}
</div>
<div class="field">
<div class="control">
<button class="button is-info" type="submit">Add</button>
</div>
</div>
{% csrf_token %}
</form>
</div>
{% endblock %}

View File

@ -128,10 +128,17 @@
</div>
{% if request.is_admin %}
<div class="column is-one-quarter">
<h4 class="is-size-5">Upload files</h4>
<h4 class="is-size-5">Add Files</h4>
{% if "gdrive" in methods %}
<div class="has-text-centered mt-3">
<a class="button button-primary" href="{% url 'work_gdrive' collection.pk object.pk %}">Link Google Drive Files</a><br/>
</div>
{% endif %}
{% if "upload" in methods %}
<form action="{% url 'document_add' collection.pk object.pk %}" class="dropzone" id="doc-upload" style="-moz-user-select: none">
{% csrf_token %}
</form>
{% endif %}
</div>
{% endif %}
</div>

View File

@ -1,6 +1,7 @@
from django.urls import path
from . import views
from .gdrive import views as gdrive_views
from library.views import api
@ -64,6 +65,11 @@ urlpatterns = [
views.WorkAddDocumentView.as_view(),
name="document_add",
),
path(
"collections/<int:collection>/works/<int:pk>/gdrive",
gdrive_views.WorkGDriveView.as_view(),
name="work_gdrive",
),
path(
"collections/<int:collection>/works/<int:pk>/download",
views.WorkDownloadView.as_view(),

View File

@ -1,4 +1,5 @@
from django.shortcuts import get_object_or_404, redirect, resolve_url
from django.http import HttpRequest
from django.views.generic import TemplateView
from django.views.generic.detail import DetailView, SingleObjectMixin, View
from django.views.generic.list import ListView
@ -19,7 +20,7 @@ import string
from interface.views import ProjectMixin, AuthorizedResourceMixin
from interface.utils import signed_url
from library.models import Collection, Work, Document, Section
from library.music_tags import MUSIC_TAGS, MusicTag, auto_tag
from library.music_tags import MUSIC_TAGS, MusicTag
from library import forms, models
from library.pdf_utils import extract_pages, extract_and_concat
from library.indexer import indexer, model_search
@ -33,14 +34,15 @@ class ProjectItemListView(ProjectMixin, ListView):
template_name = "library/item_list.html"
model = models.ProjectItem
def post(self, request, **kwargs):
def post(self, request: HttpRequest, **kwargs):
project_works = self.project.works.all()
instruments = request.POST.getlist("instruments")
works = request.POST.getlist("works")
self.request.session["part"] = request.POST.get("part", "")
self.request.session["instrument"] = request.POST.get("instrument")
request.session["part"] = request.POST.get("part", "")
request.session["instrument"] = request.POST.get("instrument")
valid_pks = [x.pk for x in project_works]
@ -314,6 +316,18 @@ class WorkAddView(CollectionMixin, FormView):
class WorkDetailView(CollectionMixin, DetailView):
model = models.Work
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
methods = set("upload")
match self.collection.storage.storage:
case "library.storage.GDriveLinkStorage":
methods.discard("upload")
methods.add("gdrive")
context["methods"] = methods
return context
class WorkUpdateView(CollectionMixin, UpdateView):
model = models.Work
@ -471,13 +485,10 @@ class WorkAddDocumentView(CollectionMixin, CreateView):
# auto tag the document
# name, ext = os.path.splitext(os.path.basename(doc.upload.name))
if doc.doctype == models.Document.DOCTYPE_PDF:
inst = auto_tag(orig_name)
if inst:
doc.sections.create(tag=inst.abbreviate())
doc.auto_tag()
if self.request.headers["Accept"] == "application/json":
os.path.basename(doc.upload.name)
# filename = os.path.basename(doc.upload.name)
return JsonResponse(
{
"message": "created",
@ -497,6 +508,31 @@ class WorkAddDocumentView(CollectionMixin, CreateView):
return redirect("document_annotate", self.collection.pk, doc.pk)
class WorkAddDocumentBulkView(CollectionMixin, FormView):
template_name = "interface/default_form.html"
form_class = forms.DocumentBulkForm
@property
def cancel_url(self):
return resolve_url("work_detail", self.collection.pk, self.kwargs["pk"])
def form_valid(self, form):
folder_link = form.cleaned_data["folder_link"]
work = self.collection.works.get(pk=self.kwargs["pk"])
current = set(work.docs.values_list("upload", flat=True))
logger.info("Current documents: %r", current)
for link in self.collection.storage.instance().folder_import(folder_link):
uri = f"{self.collection.storage.name}:{link}"
if uri not in current:
doc = work.docs.create(upload=uri, doctype=Document.DOCTYPE_PDF)
doc.auto_tag()
return redirect("work_detail", self.collection.pk, self.kwargs["pk"])
class DocumentMixin(CollectionMixin):
model = models.Document