polyphonic/app/library/views/__init__.py
2026-05-13 09:32:22 +10:00

651 lines
20 KiB
Python

from django.shortcuts import get_object_or_404, redirect, resolve_url
from django.http import HttpRequest
from django.views.generic import TemplateView
from django.views.generic.detail import DetailView, SingleObjectMixin, View
from django.views.generic.list import ListView
from django.views.generic.edit import CreateView, FormView, UpdateView, DeleteView
from django.http import FileResponse, HttpResponse, JsonResponse
from django.db.models import Q, Sum
from django.db import transaction
from django.utils.timezone import now
from django.template.loader import render_to_string
from django.core.exceptions import SuspiciousOperation
from django.core.paginator import Paginator
from django.http import Http404, HttpResponseRedirect
import json
import os.path
import string
from interface.views import ProjectMixin, AuthorizedResourceMixin
from interface.utils import signed_url
from library.models import Collection, Work, Document, Section
from library.music_tags import MUSIC_TAGS, MusicTag
from library import forms, models
from library.pdf_utils import extract_pages, extract_and_concat
from library.indexer import indexer, model_search
import logging
logger = logging.getLogger(__name__)
class ProjectItemListView(ProjectMixin, ListView):
template_name = "library/item_list.html"
model = models.ProjectItem
def post(self, request: HttpRequest, **kwargs):
project_works = self.project.works.all()
instruments = request.POST.getlist("instruments")
works = request.POST.getlist("works")
request.session["part"] = request.POST.get("part", "")
request.session["instrument"] = request.POST.get("instrument")
valid_pks = [x.pk for x in project_works]
sections = []
for i, pk in enumerate(works):
if int(pk) not in valid_pks:
raise Exception(f"Not a valid work pk: {pk}")
tag = instruments[i]
if tag == "-":
continue
part = (
Section.objects.filter(tag=tag, doc__work=pk)
.select_related("doc")
.get()
)
sections.append(
(part.doc.upload.path, part.doc.work.name, part.start, part.end, 1)
)
result = extract_and_concat(sections)
download_name = f"{self.project.name}.pdf"
response = FileResponse(result, content_type="application/pdf")
response["Content-Disposition"] = f'inline; filename="{download_name}"'
return response
def get_queryset(self):
return (
super(ProjectItemListView, self)
.get_queryset()
.select_related("project", "work")
)
def get_context_data(self, **kwargs):
data = super(ProjectItemListView, self).get_context_data(**kwargs)
data["instruments"] = MUSIC_TAGS
data["instrument"] = self.request.session.get("instrument", "Score")
data["part"] = self.request.session.get("part", "0")
data["running_time"] = self.get_queryset().aggregate(Sum("work__running_time"))[
"work__running_time__sum"
]
return data
class ProjectItemManageView(ProjectMixin, ListView):
template_name = "library/item_list_manage.html"
model = models.ProjectItem
def post(self, request, **kwargs):
self.request = request
self.kwargs = kwargs
data = json.loads(request.body)
q = self.get_queryset()
for pk, order in data.items():
order = int(order)
if order == -1:
q.filter(pk=pk).delete()
else:
q.filter(pk=pk).update(order=order)
return HttpResponse(status=204)
def get_queryset(self):
return (
super(ProjectItemManageView, self)
.get_queryset()
.select_related("project", "work")
)
class ProjectItemAddView(ProjectMixin, UpdateView):
form_class = forms.PlaylistAddForm
template_name = "interface/default_form.html"
def get_success_url(self):
return resolve_url("item_list_manage", project=self.kwargs["project"])
def get_object(self, queryset=None):
return self.get_project()
""" COLLECTION VIEWS """
class CollectionMixin(AuthorizedResourceMixin):
collection: Collection | None = None
def is_authorized(self):
collection_id = self.kwargs["collection"]
self.collection = get_object_or_404(models.Collection, pk=collection_id)
if super().is_authorized():
return True
if self.collection.has_administrator(self.request.user):
self.request.is_admin = True
return True
if self.is_authorized_key("collection", collection_id, self.collection.nonce):
return True
return False
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)
if self.collection:
data["collection"] = self.collection
return data
def get_queryset(self):
return super().get_queryset().filter(collection=self.collection)
class CollectionListView(ListView):
paginate_by = 20
def get_queryset(self):
collections = models.Collection.objects.order_by("name")
if self.request.user.is_anonymous:
return models.Collection.objects.none()
if self.request.user.is_staff:
return collections
return collections.filter(
Q(administrators=self.request.user)
| Q(allowed_ensembles__ensemble__admins=self.request.user)
)
class WorkListView(CollectionMixin, TemplateView):
paginate_by = 20
template_name = "library/work_list.html"
def get_context_data(self, *args, **kwargs):
data = super(WorkListView, self).get_context_data(*args, **kwargs)
data["title"] = "My Library"
data["collection"] = self.collection
data["query"] = self.request.GET.get("q", "")
page = int(self.request.GET.get("page", 1)) # type: ignore
if data["query"]:
qs = self.get_results(data["query"], page)
data["meta"] = qs.meta
# data["page_range"] = data["page_obj"]["paginator"]
else:
qs = self.get_queryset()
start = self.request.GET.get("start")
if start:
start = start.upper()
qs = qs.filter(name__gte=start, name__lt=start + "~")
data["start"] = start
data["letters"] = string.ascii_uppercase
paginator = Paginator(qs, self.paginate_by)
data["page_obj"] = paginator.get_page(page)
data["object_list"] = data["page_obj"]
data["page_range"] = paginator.get_elided_page_range(page)
return data
def get_works(self):
raise NotImplementedError
def get_collections(self):
raise NotImplementedError
def get_queryset(self):
works = self.get_works()
return works.order_by("name", "composer", "edition", "pk").distinct()
def get_results(self, query, page):
try:
collections = self.get_collections()
except IndexError:
return []
return model_search(query, collections, page, self.paginate_by)
class LibraryWorkListView(WorkListView):
def is_authorized(self):
return True
def get_collections(self):
collections = models.Collection.objects.all()
if self.request.user.is_superuser:
return []
collections = models.Collection.objects.filter(administrators=self.request.user)
return collections.values_list("pk", flat=True)
def get_works(self):
if self.request.user.is_superuser:
return Work.objects.all()
return Work.objects.filter(collection_id__in=self.get_collections())
class CollectionWorkListView(WorkListView):
def request_denied(self):
if "auth" in self.request.GET:
if self.request.GET["auth"] != self.collection.auth():
raise SuspiciousOperation("Bad collection link")
self.add_authorized_key(
"collection", self.collection.pk, self.collection.nonce
)
return HttpResponseRedirect(self.request.path)
return super().request_denied()
def get_collections(self):
return [int(self.kwargs["collection"])]
def get_works(self):
works = Work.objects.filter(collection=self.kwargs["collection"])
# if self.request.is_admin:
# loan_count = Count('project_items', Q(project_items__checkout__lte=now(), project_items__returned=None))
# works = works.annotate(loan_count=loan_count)
return works
def get_context_data(self, *args, **kwargs):
data = super(CollectionWorkListView, self).get_context_data(*args, **kwargs)
data["title"] = self.collection.name
return data
class WorkAddView(CollectionMixin, FormView):
template_name = "interface/default_form.html"
form_class = forms.WorkCreateForm
title = "Add a new work"
def get_form(self, form_class=None):
form = super().get_form(form_class)
qs = models.Orchestration.objects.filter(
Q(collection=None) | Q(collection=self.collection)
)
form.fields["orchestration"].queryset = qs.order_by("-collection_id", "pk")
return form
def form_valid(self, form):
work = form.save(commit=False)
# work.ensemble_id = self.request.ensemble_id
work.collection_id = self.collection.pk
work.save()
# handle the files
uploads = self.request.FILES.getlist("uploads")
docs = []
for f in uploads:
docs.append(work.docs.create(upload=f).pk)
ix = indexer.get_index()
indexer.index_works(ix, [work])
if len(docs) == 1:
return redirect("document_annotate", docs[0])
else:
return redirect("work_detail", collection=self.collection.pk, pk=work.pk)
class WorkDetailView(CollectionMixin, DetailView):
model = models.Work
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
methods = set("upload")
match self.collection.storage.storage:
case "library.storage.GDriveLinkStorage":
methods.discard("upload")
methods.add("gdrive")
context["methods"] = methods
return context
class WorkUpdateView(CollectionMixin, UpdateView):
model = models.Work
form_class = forms.WorkCreateForm
template_name = "interface/default_form.html"
def form_valid(self, form):
response = super().form_valid(form)
ix = indexer.get_index()
indexer.index_works(ix, [self.object])
return response
def get_success_url(self):
return resolve_url("work_detail", self.collection.pk, self.kwargs["pk"])
class WorkAddToProject(CollectionMixin, FormView):
admin_required = True
form_class = forms.ProjectSelectForm
template_name = "interface/default_form.html"
title = "Select project to add work to"
def get_object(self):
return Work.objects.get(pk=self.kwargs["pk"])
def get_form(self):
f = super(WorkAddToProject, self).get_form()
qs = f.fields["project"].queryset.select_related("ensemble")
# Limit to projects for ensembles where we are an admin and they haven't occured yet
qs = qs.for_user(self.request.user).current()
# dont show projects already added to
work = self.get_object()
qs = qs.exclude(pk__in=work.projects.all())
f.fields["project"].queryset = qs.order_by("ensemble__name", "name")
return f
def form_valid(self, form):
work = self.get_object()
project = form.cleaned_data["project"]
work.project_items.create(
project=project, approved_by=self.request.user, checkout=now()
)
return redirect("item_list", project=project.pk)
class WorkPartsView(CollectionMixin, DetailView):
model = models.Work
template_name = "library/work_parts_fragment.html"
class WorkPartSetView(CollectionMixin, DetailView):
template_name = "library/work_partset.html"
def post(self, request, *args, **kwargs):
work = self.get_object()
parts = request.POST.getlist("parts")
copies = request.POST.getlist("copies")
sections = []
for i, tag in enumerate(parts):
c = int(copies[i])
if c > 0:
for part in models.Section.objects.select_related("doc").filter(
tag=tag, doc__work=work
):
sections.append(
(part.doc.upload.path, part.name, part.start, part.end, c)
)
result = extract_and_concat(sections)
download_name = f"{work.name}.pdf"
response = FileResponse(result, content_type="application/pdf")
response["Content-Disposition"] = f'inline; filename="{download_name}"'
return response
def get_queryset(self):
works = Work.objects.all()
if not self.request.is_admin:
works = works.filter(
collection__allowed_ensembles__ensemble=self.request.ensemble_id
)
return works
class WorkDownloadView(CollectionMixin, SingleObjectMixin, View):
model = models.Work
def get(self, request, *args, **kwargs):
self.object = self.get_object()
tags = request.GET.getlist("tag")
if not tags:
raise Http404("No tags given")
sections = list(self.object.tagged_sections(*tags))
print(sections)
if len(sections) == 0:
raise Http404("No matching sections")
if len(sections) == 1 and sections[0].start == 0:
# bypass extraction and redirect to the url
logger.debug("Redirecting to url")
return redirect(sections[0].upload.url)
result = extract_and_concat(
[(s.upload.path, s.upload.name, s.start, s.end, 1) for s in sections]
)
tag_names = " - ".join([str(MusicTag.from_tag(tag)) for tag in tags])
download_name = f"{self.object.name} - {tag_names}.pdf"
response = FileResponse(result, content_type="application/pdf")
response["Content-Disposition"] = f'inline; filename="{download_name}"'
return response
class WorkAddDocumentView(CollectionMixin, CreateView):
template_name = "interface/default_form.html"
model = Document
fields = ["upload"]
def title(self):
work = Work.objects.get(pk=self.kwargs["pk"])
return f"Add a document to {work.name}"
def form_invalid(self, form):
if self.request.headers["Accept"] == "application/json":
return HttpResponse(status=400)
return super().form_invalid(form)
def form_valid(self, form):
orig_name, ext = os.path.splitext(form.cleaned_data["upload"].name)
logger.info("Uploaded: %s", orig_name)
doc = form.save(commit=False)
doc.doctype = models.Document.DOCTYPE_MAP.get(
ext.lower(), models.Document.DOCTYPE_MISC
)
doc.work_id = self.kwargs["pk"]
doc.save()
# auto tag the document
# name, ext = os.path.splitext(os.path.basename(doc.upload.name))
doc.auto_tag()
if self.request.headers["Accept"] == "application/json":
# filename = os.path.basename(doc.upload.name)
return JsonResponse(
{
"message": "created",
"id": doc.pk,
"entry": render_to_string(
"library/document_entry.html",
{
"collection": self.collection,
"doc": doc,
"request": self.request,
},
),
},
status=201,
)
return redirect("document_annotate", self.collection.pk, doc.pk)
class WorkAddDocumentBulkView(CollectionMixin, FormView):
template_name = "interface/default_form.html"
form_class = forms.DocumentBulkForm
@property
def cancel_url(self):
return resolve_url("work_detail", self.collection.pk, self.kwargs["pk"])
def form_valid(self, form):
folder_link = form.cleaned_data["folder_link"]
work = self.collection.works.get(pk=self.kwargs["pk"])
current = set(work.docs.values_list("upload", flat=True))
logger.info("Current documents: %r", current)
for link in self.collection.storage.instance().folder_import(folder_link):
uri = f"{self.collection.storage.name}:{link}"
if uri not in current:
doc = work.docs.create(upload=uri, doctype=Document.DOCTYPE_PDF)
doc.auto_tag()
return redirect("work_detail", self.collection.pk, self.kwargs["pk"])
class DocumentMixin(CollectionMixin):
model = models.Document
def get_queryset(self):
qs = models.Document.objects.select_related("work")
if self.request.is_admin:
return qs
return qs.filter(work__collection=self.collection)
class DocumentDetailView(DocumentMixin, DetailView):
pass
class DocumentDownloadView(DocumentMixin, SingleObjectMixin, View):
def get(self, request, **args):
self.request = request
self.args = args
self.object = self.get_object()
if request.GET.get("method") == "direct":
return redirect(self.object.upload.url)
response = FileResponse(
self.object.upload.open("rb"), content_type="application/pdf"
)
return response
class DocumentAnnotateView(DocumentMixin, DetailView):
template_name = "library/document_annotate.html"
def post(self, request, **args):
self.request = request
self.args = args
self.object = self.get_object()
data = json.loads(request.body)
with transaction.atomic():
self.object.sections.all().delete()
for tag, start, end in data:
# pages.sort()
# end = pages[-1] if len(pages) > 1 else None
self.object.sections.create(tag=tag, start=start, end=end)
return HttpResponse(status=204)
def get_context_data(self, **kwargs):
data = super(DocumentAnnotateView, self).get_context_data(**kwargs)
pages = []
for part in data["document"].sections.all():
pages.append((part.tag, part.start, part.end))
data["url"] = signed_url(
"document_download",
collection=data["collection"].pk,
pk=data["document"].pk,
)
data["json_data"] = {"pageTags": pages, "instruments": dict(MUSIC_TAGS)}
return data
class DocumentDeleteView(DocumentMixin, DeleteView):
# def get_template_names(self):
# return ["interface/default_form.html"]
def get_success_url(self):
return resolve_url("work_detail", self.collection.pk, self.object.work_id)
class PartDownloadView(CollectionMixin, SingleObjectMixin, View):
pk_url_kwarg = "section"
def get(self, request, **args):
self.request = request
self.args = args
self.object = self.get_object()
if self.object.start is None:
return redirect(self.object.doc.upload.url)
result = extract_pages(
self.object.doc.upload.path,
self.object.doc.work.name,
self.object.start,
self.object.end,
)
# download_name = f'{self.object.doc.work.name}_{self.object.instrument}.pdf'
response = FileResponse(result, content_type="application/pdf")
response["Content-Disposition"] = f'inline; filename="{self.args["filename"]}"'
return response
def get_object(self):
return (
Section.objects.filter(doc__work__collection=self.collection)
.select_related("doc", "doc__work")
.get(pk=self.kwargs["section"])
)
class StorageBrowserView(CollectionMixin, TemplateView):
template_name = "library/storage_browser.html"
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)
folder = self.kwargs.get("folder") or data["collection"].prefix
data["folders"], data["files"] = (
data["collection"].storage.instance().listdir(folder)
)
return data