from django.shortcuts import get_object_or_404, redirect, resolve_url from django.views.generic import TemplateView from django.views.generic.detail import DetailView, SingleObjectMixin, View from django.views.generic.list import ListView from django.views.generic.edit import CreateView, FormView, UpdateView, DeleteView from django.http import FileResponse, HttpResponse, JsonResponse from django.db.models import Q, Sum from django.db import transaction from django.utils.timezone import now from django.template.loader import render_to_string from django.core.exceptions import SuspiciousOperation from django.core.paginator import Paginator from django.http import Http404, HttpResponseRedirect import json import os.path import string from interface.views import ProjectMixin, AuthorizedResourceMixin from interface.utils import signed_url from library.models import Collection, Work, Document, Section from library.music_tags import MUSIC_TAGS, MusicTag, auto_tag from library import forms, models from library.pdf_utils import extract_pages, extract_and_concat from library.indexer import indexer, model_search import logging logger = logging.getLogger(__name__) class ProjectItemListView(ProjectMixin, ListView): template_name = "library/item_list.html" model = models.ProjectItem def post(self, request, **kwargs): project_works = self.project.works.all() instruments = request.POST.getlist("instruments") works = request.POST.getlist("works") self.request.session["part"] = request.POST.get("part", "") self.request.session["instrument"] = request.POST.get("instrument") valid_pks = [x.pk for x in project_works] sections = [] for i, pk in enumerate(works): if int(pk) not in valid_pks: raise Exception(f"Not a valid work pk: {pk}") tag = instruments[i] if tag == "-": continue part = ( Section.objects.filter(tag=tag, doc__work=pk) .select_related("doc") .get() ) sections.append( (part.doc.upload.path, part.doc.work.name, part.start, part.end, 1) ) result = extract_and_concat(sections) download_name = f"{self.project.name}.pdf" response = FileResponse(result, content_type="application/pdf") response["Content-Disposition"] = f'inline; filename="{download_name}"' return response def get_queryset(self): return ( super(ProjectItemListView, self) .get_queryset() .select_related("project", "work") ) def get_context_data(self, **kwargs): data = super(ProjectItemListView, self).get_context_data(**kwargs) data["instruments"] = MUSIC_TAGS data["instrument"] = self.request.session.get("instrument", "Score") data["part"] = self.request.session.get("part", "0") data["running_time"] = self.get_queryset().aggregate(Sum("work__running_time"))[ "work__running_time__sum" ] return data class ProjectItemManageView(ProjectMixin, ListView): template_name = "library/item_list_manage.html" model = models.ProjectItem def post(self, request, **kwargs): self.request = request self.kwargs = kwargs data = json.loads(request.body) q = self.get_queryset() for pk, order in data.items(): order = int(order) if order == -1: q.filter(pk=pk).delete() else: q.filter(pk=pk).update(order=order) return HttpResponse(status=204) def get_queryset(self): return ( super(ProjectItemManageView, self) .get_queryset() .select_related("project", "work") ) class ProjectItemAddView(ProjectMixin, UpdateView): form_class = forms.PlaylistAddForm template_name = "interface/default_form.html" def get_success_url(self): return resolve_url("item_list_manage", project=self.kwargs["project"]) def get_object(self, queryset=None): return self.get_project() """ COLLECTION VIEWS """ class CollectionMixin(AuthorizedResourceMixin): collection: Collection | None = None def is_authorized(self): collection_id = self.kwargs["collection"] self.collection = get_object_or_404(models.Collection, pk=collection_id) if super().is_authorized(): return True if self.collection.has_administrator(self.request.user): self.request.is_admin = True return True if self.is_authorized_key("collection", collection_id, self.collection.nonce): return True return False def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) if self.collection: data["collection"] = self.collection return data def get_queryset(self): return super().get_queryset().filter(collection=self.collection) class CollectionListView(ListView): paginate_by = 20 def get_queryset(self): collections = models.Collection.objects.order_by("name") if self.request.user.is_anonymous: return models.Collection.objects.none() if self.request.user.is_staff: return collections return collections.filter( Q(administrators=self.request.user) | Q(allowed_ensembles__ensemble__admins=self.request.user) ) class WorkListView(CollectionMixin, TemplateView): paginate_by = 20 template_name = "library/work_list.html" def get_context_data(self, *args, **kwargs): data = super(WorkListView, self).get_context_data(*args, **kwargs) data["title"] = "My Library" data["collection"] = self.collection data["query"] = self.request.GET.get("q", "") page = int(self.request.GET.get("page", 1)) # type: ignore if data["query"]: qs = self.get_results(data["query"], page) data["meta"] = qs.meta # data["page_range"] = data["page_obj"]["paginator"] else: qs = self.get_queryset() start = self.request.GET.get("start") if start: start = start.upper() qs = qs.filter(name__gte=start, name__lt=start + "~") data["start"] = start data["letters"] = string.ascii_uppercase paginator = Paginator(qs, self.paginate_by) data["page_obj"] = paginator.get_page(page) data["object_list"] = data["page_obj"] data["page_range"] = paginator.get_elided_page_range(page) return data def get_works(self): raise NotImplementedError def get_collections(self): raise NotImplementedError def get_queryset(self): works = self.get_works() return works.order_by("name", "composer", "edition", "pk").distinct() def get_results(self, query, page): try: collections = self.get_collections() except IndexError: return [] return model_search(query, collections, page, self.paginate_by) class LibraryWorkListView(WorkListView): def is_authorized(self): return True def get_collections(self): collections = models.Collection.objects.all() if self.request.user.is_superuser: return [] collections = models.Collection.objects.filter(administrators=self.request.user) return collections.values_list("pk", flat=True) def get_works(self): if self.request.user.is_superuser: return Work.objects.all() return Work.objects.filter(collection_id__in=self.get_collections()) class CollectionWorkListView(WorkListView): def request_denied(self): if "auth" in self.request.GET: if self.request.GET["auth"] != self.collection.auth(): raise SuspiciousOperation("Bad collection link") self.add_authorized_key( "collection", self.collection.pk, self.collection.nonce ) return HttpResponseRedirect(self.request.path) return super().request_denied() def get_collections(self): return [int(self.kwargs["collection"])] def get_works(self): works = Work.objects.filter(collection=self.kwargs["collection"]) # if self.request.is_admin: # loan_count = Count('project_items', Q(project_items__checkout__lte=now(), project_items__returned=None)) # works = works.annotate(loan_count=loan_count) return works def get_context_data(self, *args, **kwargs): data = super(CollectionWorkListView, self).get_context_data(*args, **kwargs) data["title"] = self.collection.name return data class WorkAddView(CollectionMixin, FormView): template_name = "interface/default_form.html" form_class = forms.WorkCreateForm title = "Add a new work" def get_form(self, form_class=None): form = super().get_form(form_class) qs = models.Orchestration.objects.filter( Q(collection=None) | Q(collection=self.collection) ) form.fields["orchestration"].queryset = qs.order_by("-collection_id", "pk") return form def form_valid(self, form): work = form.save(commit=False) # work.ensemble_id = self.request.ensemble_id work.collection_id = self.collection.pk work.save() # handle the files uploads = self.request.FILES.getlist("uploads") docs = [] for f in uploads: docs.append(work.docs.create(upload=f).pk) ix = indexer.get_index() indexer.index_works(ix, [work]) if len(docs) == 1: return redirect("document_annotate", docs[0]) else: return redirect("work_detail", collection=self.collection.pk, pk=work.pk) class WorkDetailView(CollectionMixin, DetailView): model = models.Work class WorkUpdateView(CollectionMixin, UpdateView): model = models.Work form_class = forms.WorkCreateForm template_name = "interface/default_form.html" def form_valid(self, form): response = super().form_valid(form) ix = indexer.get_index() indexer.index_works(ix, [self.object]) return response def get_success_url(self): return resolve_url("work_detail", self.collection.pk, self.kwargs["pk"]) class WorkAddToProject(CollectionMixin, FormView): admin_required = True form_class = forms.ProjectSelectForm template_name = "interface/default_form.html" title = "Select project to add work to" def get_object(self): return Work.objects.get(pk=self.kwargs["pk"]) def get_form(self): f = super(WorkAddToProject, self).get_form() qs = f.fields["project"].queryset.select_related("ensemble") # Limit to projects for ensembles where we are an admin and they haven't occured yet qs = qs.for_user(self.request.user).current() # dont show projects already added to work = self.get_object() qs = qs.exclude(pk__in=work.projects.all()) f.fields["project"].queryset = qs.order_by("ensemble__name", "name") return f def form_valid(self, form): work = self.get_object() project = form.cleaned_data["project"] work.project_items.create( project=project, approved_by=self.request.user, checkout=now() ) return redirect("item_list", project=project.pk) class WorkPartsView(CollectionMixin, DetailView): model = models.Work template_name = "library/work_parts_fragment.html" class WorkPartSetView(CollectionMixin, DetailView): template_name = "library/work_partset.html" def post(self, request, *args, **kwargs): work = self.get_object() parts = request.POST.getlist("parts") copies = request.POST.getlist("copies") sections = [] for i, tag in enumerate(parts): c = int(copies[i]) if c > 0: for part in models.Section.objects.select_related("doc").filter( tag=tag, doc__work=work ): sections.append( (part.doc.upload.path, part.name, part.start, part.end, c) ) result = extract_and_concat(sections) download_name = f"{work.name}.pdf" response = FileResponse(result, content_type="application/pdf") response["Content-Disposition"] = f'inline; filename="{download_name}"' return response def get_queryset(self): works = Work.objects.all() if not self.request.is_admin: works = works.filter( collection__allowed_ensembles__ensemble=self.request.ensemble_id ) return works class WorkDownloadView(CollectionMixin, SingleObjectMixin, View): model = models.Work def get(self, request, *args, **kwargs): self.object = self.get_object() tags = request.GET.getlist("tag") if not tags: raise Http404("No tags given") sections = list(self.object.tagged_sections(*tags)) print(sections) if len(sections) == 0: raise Http404("No matching sections") if len(sections) == 1 and sections[0].start == 0: # bypass extraction and redirect to the url logger.debug("Redirecting to url") return redirect(sections[0].upload.url) result = extract_and_concat( [(s.upload.path, s.upload.name, s.start, s.end, 1) for s in sections] ) tag_names = " - ".join([str(MusicTag.from_tag(tag)) for tag in tags]) download_name = f"{self.object.name} - {tag_names}.pdf" response = FileResponse(result, content_type="application/pdf") response["Content-Disposition"] = f'inline; filename="{download_name}"' return response class WorkAddDocumentView(CollectionMixin, CreateView): template_name = "interface/default_form.html" model = Document fields = ["upload"] def title(self): work = Work.objects.get(pk=self.kwargs["pk"]) return f"Add a document to {work.name}" def form_invalid(self, form): if self.request.headers["Accept"] == "application/json": return HttpResponse(status=400) return super().form_invalid(form) def form_valid(self, form): orig_name, ext = os.path.splitext(form.cleaned_data["upload"].name) logger.info("Uploaded: %s", orig_name) doc = form.save(commit=False) doc.doctype = models.Document.DOCTYPE_MAP.get( ext.lower(), models.Document.DOCTYPE_MISC ) doc.work_id = self.kwargs["pk"] doc.save() # auto tag the document # name, ext = os.path.splitext(os.path.basename(doc.upload.name)) if doc.doctype == models.Document.DOCTYPE_PDF: inst = auto_tag(orig_name) if inst: doc.sections.create(tag=inst.abbreviate()) if self.request.headers["Accept"] == "application/json": os.path.basename(doc.upload.name) return JsonResponse( { "message": "created", "id": doc.pk, "entry": render_to_string( "library/document_entry.html", { "collection": self.collection, "doc": doc, "request": self.request, }, ), }, status=201, ) return redirect("document_annotate", self.collection.pk, doc.pk) class DocumentMixin(CollectionMixin): model = models.Document def get_queryset(self): qs = models.Document.objects.select_related("work") if self.request.is_admin: return qs return qs.filter(work__collection=self.collection) class DocumentDetailView(DocumentMixin, DetailView): pass class DocumentDownloadView(DocumentMixin, SingleObjectMixin, View): def get(self, request, **args): self.request = request self.args = args self.object = self.get_object() if request.GET.get("method") == "direct": return redirect(self.object.upload.url) response = FileResponse( self.object.upload.open("rb"), content_type="application/pdf" ) return response class DocumentAnnotateView(DocumentMixin, DetailView): template_name = "library/document_annotate.html" def post(self, request, **args): self.request = request self.args = args self.object = self.get_object() data = json.loads(request.body) with transaction.atomic(): self.object.sections.all().delete() for tag, start, end in data: # pages.sort() # end = pages[-1] if len(pages) > 1 else None self.object.sections.create(tag=tag, start=start, end=end) return HttpResponse(status=204) def get_context_data(self, **kwargs): data = super(DocumentAnnotateView, self).get_context_data(**kwargs) pages = [] for part in data["document"].sections.all(): pages.append((part.tag, part.start, part.end)) data["url"] = signed_url( "document_download", collection=data["collection"].pk, pk=data["document"].pk, ) data["json_data"] = {"pageTags": pages, "instruments": dict(MUSIC_TAGS)} return data class DocumentDeleteView(DocumentMixin, DeleteView): # def get_template_names(self): # return ["interface/default_form.html"] def get_success_url(self): return resolve_url("work_detail", self.collection.pk, self.object.work_id) class PartDownloadView(CollectionMixin, SingleObjectMixin, View): pk_url_kwarg = "section" def get(self, request, **args): self.request = request self.args = args self.object = self.get_object() if self.object.start is None: return redirect(self.object.doc.upload.url) result = extract_pages( self.object.doc.upload.path, self.object.doc.work.name, self.object.start, self.object.end, ) # download_name = f'{self.object.doc.work.name}_{self.object.instrument}.pdf' response = FileResponse(result, content_type="application/pdf") response["Content-Disposition"] = f'inline; filename="{self.args["filename"]}"' return response def get_object(self): return ( Section.objects.filter(doc__work__collection=self.collection) .select_related("doc", "doc__work") .get(pk=self.kwargs["section"]) ) class StorageBrowserView(CollectionMixin, TemplateView): template_name = "library/storage_browser.html" def get_context_data(self, **kwargs): data = super().get_context_data(**kwargs) folder = self.kwargs.get("folder") or data["collection"].prefix data["folders"], data["files"] = ( data["collection"].storage.instance().listdir(folder) ) return data