whoosh_search #11

Merged
tris merged 2 commits from whoosh_search into master 2026-05-11 21:55:54 +10:00
8 changed files with 532 additions and 146 deletions

1
.gitignore vendored
View File

@ -16,3 +16,4 @@ poetry.lock
/cache
/local_storage
/media
/index

View File

@ -0,0 +1,103 @@
from typing import Protocol, Any, Iterable, Generator
from django.conf import settings
from django.utils.module_loading import import_module
from django.db.models import QuerySet
from library.models import Work, Collection
instance = getattr(settings, "INDEXER", "library.indexer.whoosh")
class Indexer(Protocol):
def create_index(self) -> None: ...
def get_index(self) -> None: ...
def reset_index(self) -> None: ...
def search(
self,
query: str,
collections: list[int],
page: int = 1,
pagesize: int = 20,
) -> tuple[list[dict], dict[str, Any]]: ...
def index_docs(self, docs: Iterable[dict]): ...
class PartialResultSet(object):
"""
Implements enough of QuerySet to fool the Paginator!
"""
def __init__(self, works, meta):
self.works = works
self.meta = meta
self.start = (meta["page"] - 1) * meta["pagesize"]
def __getitem__(self, key):
if isinstance(key, slice):
if key.start != self.start:
raise KeyError(f"Expected {self.start}, got {key.start}")
return self.works
return self.works[key]
def count(self):
return self.meta["total"]
# make the given module available as indexer
indexer: Indexer = import_module(instance) # type: ignore
def work_to_doc(work: Work) -> dict[str, str]:
tags = ",".join(work.meta_info.filter(name="tag").values_list("value", flat=True)) # type: ignore
meta = ", ".join(work.meta_info.values_list("value", flat=True)) # type: ignore
description = f"""{work.name}
{work.composer}
{work.edition}
{work.notes}
{meta}
"""
return dict(
work=str(work.pk),
collection=str(work.collection_id), # type: ignore
name=work.name,
composer=work.composer,
edition=work.edition,
tag=tags.lower(),
text=description,
)
def doc_set(works: QuerySet) -> Generator:
for work in works:
yield work_to_doc(work)
def index_works(works: QuerySet):
indexer.index_docs(doc_set(works))
def model_search(
query: str, collections: list[int], page: int = 1, pagesize: int = 20
) -> PartialResultSet:
hits, meta = indexer.search(query.lower(), collections, page, pagesize)
meta["pagesize"] = pagesize
meta["page"] = page
works = [Work(**hit) for hit in hits]
collection_names = dict(Collection.objects.values_list("pk", "name"))
for work in works:
work.collection = Collection( # type: ignore
pk=work.collection_id, name=collection_names[work.collection_id]
)
return PartialResultSet(works, meta)

View File

@ -0,0 +1,93 @@
from whoosh.index import create_in, open_dir, EmptyIndexError, Index
from whoosh.analysis import StemmingAnalyzer, CharsetFilter
from whoosh.support.charset import accent_map
from whoosh.fields import Schema, ID, TEXT, KEYWORD, STORED, NUMERIC
from whoosh.qparser import QueryParser
from whoosh.query import Term, NullQuery
from library.models import Work
from typing import Any
from django.conf import settings
from django.db.models import QuerySet
import os.path
import shutil
stemming_analyzer = StemmingAnalyzer() | CharsetFilter(accent_map)
schema = Schema(
work=NUMERIC(stored=True, unique=True),
collection=NUMERIC(stored=True),
name=TEXT(stored=True),
composer=TEXT(stored=True),
edition=TEXT(stored=True),
tag=KEYWORD(commas=True),
text=TEXT(analyzer=stemming_analyzer),
)
index_path = os.path.join(os.path.dirname(settings.BASE_DIR), "index")
def create_index() -> Index:
if not os.path.exists(index_path):
os.mkdir(index_path)
ix = create_in(index_path, schema)
return ix
def get_index() -> Index:
try:
return open_dir(index_path)
except EmptyIndexError:
return create_index()
def reset_index() -> Index:
shutil.rmtree(index_path)
return create_index()
def index_docs(works: list[dict]):
ix = get_index()
with ix.writer() as writer:
for work in works:
writer.update_document(**work)
def search(
query: str,
collections: list[int] = [],
page: int = 1,
pagesize: int = 20,
) -> tuple[list[dict], dict[str, Any]]:
meta = {}
qp = QueryParser("text", schema=schema)
q = qp.parse(query.lower())
meta["query"] = str(q)
terms = NullQuery
for c in collections:
terms = terms | Term("collection", c)
q = q & terms
hits = []
ix = get_index()
with ix.searcher() as searcher:
results = searcher.search_page(q, page, pagesize)
for result in results:
hits.append(
dict(
pk=result["work"],
name=result["name"],
composer=result["composer"],
edition=result["edition"],
collection_id=int(result["collection"]),
)
)
meta["total"] = len(results)
return hits, meta

View File

@ -0,0 +1,36 @@
from django.core.management.base import BaseCommand, CommandError
from library import models
from library.indexer import model_search, index_works, indexer
FORMATTER = "{w.name:50s} {w.edition:15s} {w.collection.name:15s}"
class Command(BaseCommand):
help = "Imports works from a csv file"
def add_arguments(self, parser):
parser.add_argument("action", choices=("run", "search", "terms"))
parser.add_argument("query", nargs="*")
parser.add_argument("--collection", "-c", nargs="*", type=int)
# parser.add_argument("collection", type=int, help="Collection ID")
# parser.add_argument("source", type=argparse.FileType("r"), help="Source CSV")
def handle(self, action, query, *args, **options):
try:
method = getattr(self, f"handle_{action}")
except AttributeError:
raise RuntimeError(f"Unknown handler: {action}")
return method("".join(query), options["collection"] or [])
def handle_run(self, query, collections=[]):
index_works(models.Work.objects.all())
def handle_search(self, query, collections=[]):
for result in model_search(query, collections):
print(FORMATTER.format(w=result))
def handle_terms(self, query, collections=[]):
ix = indexer.get_index()
with ix.searcher() as searcher:
print(b", ".join(searcher.lexicon(query or "text")))

View File

@ -7,7 +7,7 @@
<form method="GET" action="{% url 'work_list' %}">
<div class="field has-addons">
<div class="control is-expanded">
<input class="input" name="filter" type="text" placeholder="Find a work" value="{{ request.GET.filter }}"/>
<input class="input" name="q" type="text" placeholder="Find a work" value="{{ request.GET.filter }}"/>
</div>
<div class="control">
<a class="button" href="?"><i class="fas fa-times"></i></a>
@ -32,10 +32,10 @@
</p>
<p>
{% for tag in collection.tags %}
<a href="{% url 'collection_work_list' collection.pk %}?filter=tag:{{ tag }}" class="tag is-success">{{ tag }}</a>
<a href="{% url 'collection_work_list' collection.pk %}?q=tag:{{ tag }}" class="tag is-success">{{ tag }}</a>
{% endfor %}
{% for genre in collection.genres %}
<a href="{% url 'collection_work_list' collection.pk %}?filter=genre:{{ genre }}" class="tag is-warning">{{ genre }}</a>
<a href="{% url 'collection_work_list' collection.pk %}?q=genre:{{ genre }}" class="tag is-warning">{{ genre }}</a>
{% endfor %}
</p>
</div>
@ -47,4 +47,4 @@
<div>
<small>{{ ensemble.ensemble_code }}</small>
</div>
{% endblock %}
{% endblock %}

View File

@ -15,7 +15,7 @@
<form method="GET">
<div class="field has-addons">
<div class="control is-expanded">
<input class="input" name="filter" type="text" placeholder="Filter" value="{{ request.GET.filter }}"/>
<input class="input" name="q" type="text" placeholder="Filter" value="{{ request.GET.q }}"/>
</div>
<div class="control">
<a class="button" href="?"><i class="fas fa-times"></i></a>
@ -23,6 +23,15 @@
</div>
</form>
{% if letters %}
<div class="has-text-centered">
<a href="?">_</a>
{% for letter in letters %}
<a href="?start={{ letter }}">{{ letter }}</a>&nbsp;
{% endfor %}
</div>
{% endif %}
<table class="table is-striped is-fullwidth">
<thead>
<tr>
@ -35,7 +44,9 @@
<tbody>
{% for work in object_list %}
<tr>
<td><a href="{% url 'work_detail' collection=work.collection.pk pk=work.pk %}">{{ work.name }}</a></td>
<td>
<a href="{% url 'work_detail' collection=work.collection_id pk=work.pk %}">{{ work.name }}</a>
</td>
<td title="{{ work.composer }}">{{ work.composer|truncatewords:3 }}</td>
<td class="is-hidden-mobile" title="{{ work.edition }}">{{ work.edition|truncatewords:2 }}</td>
{% if not collection %}<td class="is-hidden-touch">{{ work.collection.name }}</td>{% endif %}
@ -46,20 +57,41 @@
</tbody>
</table>
<footer>
<nav class="pagination is-centered" role="navigation" aria-label="pagination">
{% if page_obj.has_previous %}
<a class="pagination-previous" href="{% url_update page=page_obj.prev_page_number %}">Previous</a>
{% endif %}
{% if page_obj.has_next %}
<a class="pagination-next" href="{% url_update page=page_obj.next_page_number %}">Next page</a>
{% endif %}
<a class="pagination-previous"
{% if page_obj.has_previous %}
href="{% url_update page=page_obj.previous_page_number %}"
{% else %}
disabled
{% endif %}>
Previous</a>
<a class="pagination-next"
{% if page_obj.has_next %}
href="{% url_update page=page_obj.next_page_number %}"
{% else %}
disabled
{% endif %}>
Next
</a>
<ul class="pagination-list">
{% for page in page_obj.paginator.page_range %}
{% for page in page_range %}
<li>
<a class="pagination-link {% if forloop.counter == page_obj.number %}is-current{% endif %}" href="{% url_update page=forloop.counter %}" aria-label="Goto page {{ forloop.counter }}">{{ forloop.counter }}</a>
{% if page == '...' %}
{{ page }}
{% else %}
<a class="pagination-link {% if page == page_obj.number %}is-current{% endif %}" href="{% url_update page=page %}" aria-label="Goto page {{ page }}">{{ page }}</a>
{% endif %}
</li>
{% endfor %}
</ul>
</nav>
{% endblock %}
{% if meta %}
<div class="has-text-right has-text-grey-light is-size-7">
Query="{{ meta.query }}"
</div>
{% endif %}
</footer>
{% endblock %}

View File

@ -11,10 +11,13 @@ from django.utils.timezone import now
from django.urls import reverse
from django.template.loader import render_to_string
from django.core.exceptions import SuspiciousOperation
from django.core.paginator import Paginator
from django.http import Http404, HttpResponseRedirect
import json
import os.path
import string
import math
import re
from interface.views import EnsembleMixin, ProjectMixin, AuthorizedResourceMixin
@ -24,58 +27,70 @@ from library.models import Collection, Work, Document, Section
from library.music_tags import MUSIC_TAGS, MusicTag, auto_tag
from library import forms, models
from library.pdf_utils import extract_pages, extract_and_concat
from library.indexer import indexer, model_search
import logging
logger = logging.getLogger(__name__)
class ProjectItemListView(ProjectMixin, ListView):
template_name = "library/item_list.html"
model = models.ProjectItem
def post(self, request, **kwargs):
project_works = self.project.works.all()
instruments = request.POST.getlist('instruments')
works = request.POST.getlist('works')
instruments = request.POST.getlist("instruments")
works = request.POST.getlist("works")
self.request.session['part'] = request.POST.get('part', '')
self.request.session['instrument'] = request.POST.get('instrument')
self.request.session["part"] = request.POST.get("part", "")
self.request.session["instrument"] = request.POST.get("instrument")
valid_pks = [ x.pk for x in project_works ]
valid_pks = [x.pk for x in project_works]
sections = []
for i, pk in enumerate(works):
if int(pk) not in valid_pks:
raise Exception(f"Not a valid work pk: {pk}")
tag = instruments[i]
if tag == '-':
if tag == "-":
continue
part = Section.objects.filter(tag=tag, doc__work=pk).select_related('doc').get()
sections.append((part.doc.upload.path, part.doc.work.name, part.start, part.end, 1))
part = (
Section.objects.filter(tag=tag, doc__work=pk)
.select_related("doc")
.get()
)
sections.append(
(part.doc.upload.path, part.doc.work.name, part.start, part.end, 1)
)
result = extract_and_concat(sections)
download_name = f'{self.project.name}.pdf'
download_name = f"{self.project.name}.pdf"
response = FileResponse(result, content_type="application/pdf")
response['Content-Disposition'] = f'inline; filename="{download_name}"'
response["Content-Disposition"] = f'inline; filename="{download_name}"'
return response
def get_queryset(self):
return super(ProjectItemListView, self).get_queryset().select_related('project', 'work')
return (
super(ProjectItemListView, self)
.get_queryset()
.select_related("project", "work")
)
def get_context_data(self, **kwargs):
data = super(ProjectItemListView, self).get_context_data(**kwargs)
data['instruments'] = MUSIC_TAGS
data['instrument'] = self.request.session.get('instrument', 'Score')
data['part'] = self.request.session.get('part', '0')
data['running_time'] = self.get_queryset().aggregate(Sum('work__running_time'))['work__running_time__sum']
data["instruments"] = MUSIC_TAGS
data["instrument"] = self.request.session.get("instrument", "Score")
data["part"] = self.request.session.get("part", "0")
data["running_time"] = self.get_queryset().aggregate(Sum("work__running_time"))[
"work__running_time__sum"
]
return data
@ -100,36 +115,42 @@ class ProjectItemManageView(ProjectMixin, ListView):
return HttpResponse(status=204)
def get_queryset(self):
return super(ProjectItemManageView, self).get_queryset().select_related('project', 'work')
return (
super(ProjectItemManageView, self)
.get_queryset()
.select_related("project", "work")
)
class ProjectItemAddView(ProjectMixin, UpdateView):
form_class = forms.PlaylistAddForm
template_name = "interface/default_form.html"
def get_success_url(self):
return resolve_url('item_list_manage', project=self.kwargs['project'])
return resolve_url("item_list_manage", project=self.kwargs["project"])
def get_object(self):
return self.get_project()
""" COLLECTION VIEWS """
class CollectionMixin(AuthorizedResourceMixin):
collection = None
class CollectionMixin(AuthorizedResourceMixin):
collection: Collection | None = None
def is_authorized(self):
collection_id = self.kwargs['collection']
collection_id = self.kwargs["collection"]
self.collection = get_object_or_404(models.Collection, pk=collection_id)
if super().is_authorized():
return True
if self.collection.has_administrator(self.request.user):
self.request.is_admin = True
return True
if self.is_authorized_key('collection', collection_id, self.collection.nonce):
if self.is_authorized_key("collection", collection_id, self.collection.nonce):
return True
return False
@ -137,85 +158,129 @@ class CollectionMixin(AuthorizedResourceMixin):
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)
if self.collection:
data['collection'] = self.collection
data["collection"] = self.collection
return data
def get_queryset(self):
return super().get_queryset().filter(collection=self.collection)
class CollectionListView(ListView):
paginate_by = 20
def get_queryset(self):
collections = models.Collection.objects.order_by('name')
collections = models.Collection.objects.order_by("name")
if self.request.user.is_anonymous:
return models.Collection.objects.none()
if self.request.user.is_staff:
return collections
return collections.filter(Q(administrators=self.request.user) | Q(allowed_ensembles__ensemble__admins=self.request.user))
class WorkListView(CollectionMixin, ListView):
return collections.filter(
Q(administrators=self.request.user)
| Q(allowed_ensembles__ensemble__admins=self.request.user)
)
class WorkListView(CollectionMixin, TemplateView):
paginate_by = 20
template_name = "library/work_list.html"
def get_context_data(self, *args, **kwargs):
data = super(WorkListView, self).get_context_data(*args, **kwargs)
#data['title'] = f'Music available to {self.ensemble.name}'
data['title'] = "My Library"
data["title"] = "My Library"
data["collection"] = self.collection
data["query"] = self.request.GET.get("q", "")
page = int(self.request.GET.get("page", 1)) # type: ignore
if data["query"]:
qs = self.get_results(data["query"], page)
data["meta"] = qs.meta
# data["page_range"] = data["page_obj"]["paginator"]
else:
qs = self.get_queryset()
start = self.request.GET.get("start")
if start:
start = start.upper()
qs = qs.filter(name__gte=start, name__lt=start + "~")
data["start"] = start
data["letters"] = string.ascii_uppercase
paginator = Paginator(qs, self.paginate_by)
data["page_obj"] = paginator.get_page(page)
data["object_list"] = data["page_obj"]
data["page_range"] = paginator.get_elided_page_range(page)
return data
def get_works(self):
raise NotImplementedError
def get_collections(self):
raise NotImplementedError
def get_queryset(self):
works = self.get_works()
return works.order_by("name", "composer", "edition", "pk").distinct()
def get_results(self, query, page):
try:
collections = self.get_collections()
except IndexError:
return []
return model_search(query, collections, page, self.paginate_by)
q = self.request.GET.get('filter')
if q:
if ":" in q:
name, _, value = q.partition(":")
works = works.filter(meta_info__name=name, meta_info__value__contains=value)
else:
works = works.filter(Q(name__contains=q) | Q(composer__contains=q) | Q(meta_info__value__contains=q))
return works.order_by('name', 'composer', 'edition', 'pk').distinct()
class LibraryWorkListView(WorkListView):
def is_authorized(self):
return True
def get_works(self):
def get_collections(self):
collections = models.Collection.objects.all()
if not self.request.user.is_superuser:
collections = collections.filter(administrators=self.request.user)
if self.request.user.is_superuser:
return []
collections = models.Collection.objects.filter(administrators=self.request.user)
return collections.values_list("pk", flat=True)
def get_works(self):
if self.request.user.is_superuser:
return Work.objects.all()
return Work.objects.filter(collection_id__in=self.get_collections())
return Work.objects.filter(collection__in=collections).select_related('collection')
class CollectionWorkListView(WorkListView):
def request_denied(self):
if 'auth' in self.request.GET:
if self.request.GET['auth'] != self.collection.auth():
if "auth" in self.request.GET:
if self.request.GET["auth"] != self.collection.auth():
raise SuspiciousOperation("Bad collection link")
self.add_authorized_key('collection', self.collection.pk, self.collection.nonce)
self.add_authorized_key(
"collection", self.collection.pk, self.collection.nonce
)
return HttpResponseRedirect(self.request.path)
return super().request_denied()
def get_works(self):
works = Work.objects.filter(collection=self.kwargs['collection'])
def get_collections(self):
return [int(self.kwargs["collection"])]
#if self.request.is_admin:
def get_works(self):
works = Work.objects.filter(collection=self.kwargs["collection"])
# if self.request.is_admin:
# loan_count = Count('project_items', Q(project_items__checkout__lte=now(), project_items__returned=None))
# works = works.annotate(loan_count=loan_count)
return works
def get_context_data(self, *args, **kwargs):
data = super(CollectionWorkListView, self).get_context_data(*args, **kwargs)
data['title'] = self.collection.name
data["title"] = self.collection.name
return data
class WorkAddView(CollectionMixin, FormView):
template_name = "interface/default_form.html"
form_class = forms.WorkCreateForm
@ -224,38 +289,52 @@ class WorkAddView(CollectionMixin, FormView):
def get_form(self, form_class=None):
form = super().get_form(form_class)
qs = models.Orchestration.objects.filter(Q(collection=None) | Q(collection=self.collection))
form.fields['orchestration'].queryset = qs.order_by('-collection_id', 'pk')
qs = models.Orchestration.objects.filter(
Q(collection=None) | Q(collection=self.collection)
)
form.fields["orchestration"].queryset = qs.order_by("-collection_id", "pk")
return form
def form_valid(self, form):
work = form.save(commit=False)
#work.ensemble_id = self.request.ensemble_id
# work.ensemble_id = self.request.ensemble_id
work.collection_id = self.collection.pk
work.save()
# handle the files
uploads = self.request.FILES.getlist('uploads')
uploads = self.request.FILES.getlist("uploads")
docs = []
for f in uploads:
docs.append(work.docs.create(upload=f).pk)
ix = indexer.get_index()
indexer.index_works(ix, [work])
if len(docs) == 1:
return redirect('document_annotate', docs[0])
return redirect("document_annotate", docs[0])
else:
return redirect('work_detail', collection=self.collection.pk, pk=work.pk)
return redirect("work_detail", collection=self.collection.pk, pk=work.pk)
class WorkDetailView(CollectionMixin, DetailView):
model = models.Work
model = models.Work
class WorkUpdateView(CollectionMixin, UpdateView):
model = models.Work
form_class = forms.WorkCreateForm
template_name = 'interface/default_form.html'
template_name = "interface/default_form.html"
def form_valid(self, form):
response = super().form_valid(form)
ix = indexer.get_index()
indexer.index_works(ix, [self.object])
return response
def get_success_url(self):
return resolve_url('work_detail', self.collection.pk, self.kwargs['pk'])
return resolve_url("work_detail", self.collection.pk, self.kwargs["pk"])
class WorkAddToProject(CollectionMixin, FormView):
@ -263,13 +342,13 @@ class WorkAddToProject(CollectionMixin, FormView):
form_class = forms.ProjectSelectForm
template_name = "interface/default_form.html"
title = "Select project to add work to"
def get_object(self):
return Work.objects.get(pk=self.kwargs['pk'])
return Work.objects.get(pk=self.kwargs["pk"])
def get_form(self):
f = super(WorkAddToProject, self).get_form()
qs = f.fields['project'].queryset.select_related('ensemble')
qs = f.fields["project"].queryset.select_related("ensemble")
# Limit to projects for ensembles where we are an admin and they haven't occured yet
qs = qs.for_user(self.request.user).current()
@ -278,63 +357,73 @@ class WorkAddToProject(CollectionMixin, FormView):
work = self.get_object()
qs = qs.exclude(pk__in=work.projects.all())
f.fields['project'].queryset = qs.order_by('ensemble__name', 'name')
f.fields["project"].queryset = qs.order_by("ensemble__name", "name")
return f
def form_valid(self, form):
work = self.get_object()
project = form.cleaned_data['project']
work.project_items.create(project=project, approved_by=self.request.user, checkout=now())
return redirect('item_list', project=project.pk)
project = form.cleaned_data["project"]
work.project_items.create(
project=project, approved_by=self.request.user, checkout=now()
)
return redirect("item_list", project=project.pk)
class WorkPartsView(CollectionMixin, DetailView):
model = models.Work
template_name = "library/work_parts_fragment.html"
template_name = "library/work_parts_fragment.html"
class WorkPartSetView(CollectionMixin, DetailView):
template_name = "library/work_partset.html"
def post(self, request, *args, **kwargs):
work = self.get_object()
parts = request.POST.getlist('parts')
copies = request.POST.getlist('copies')
parts = request.POST.getlist("parts")
copies = request.POST.getlist("copies")
sections = []
for i, tag in enumerate(parts):
c = int(copies[i])
if c > 0:
for part in models.Section.objects.select_related('doc').filter(tag=tag, doc__work=work):
sections.append((part.doc.upload.path, part.name, part.start, part.end, c))
for part in models.Section.objects.select_related("doc").filter(
tag=tag, doc__work=work
):
sections.append(
(part.doc.upload.path, part.name, part.start, part.end, c)
)
result = extract_and_concat(sections)
download_name = f'{work.name}.pdf'
download_name = f"{work.name}.pdf"
response = FileResponse(result, content_type="application/pdf")
response['Content-Disposition'] = f'inline; filename="{download_name}"'
response["Content-Disposition"] = f'inline; filename="{download_name}"'
return response
def get_queryset(self):
works = Work.objects.all()
if not self.request.is_admin:
works = works.filter(collection__allowed_ensembles__ensemble=self.request.ensemble_id)
works = works.filter(
collection__allowed_ensembles__ensemble=self.request.ensemble_id
)
return works
class WorkDownloadView(CollectionMixin, SingleObjectMixin, View):
model = models.Work
def get(self, request, *args, **kwargs):
self.object = self.get_object()
tags = request.GET.getlist('tag')
tags = request.GET.getlist("tag")
if not tags:
raise Http404("No tags given")
sections = list(self.object.tagged_sections(*tags))
print(sections)
@ -346,86 +435,104 @@ class WorkDownloadView(CollectionMixin, SingleObjectMixin, View):
logger.debug("Redirecting to url")
return redirect(sections[0].upload.url)
result = extract_and_concat([ (s.upload.path, s.upload.name, s.start, s.end, 1) for s in sections ])
result = extract_and_concat(
[(s.upload.path, s.upload.name, s.start, s.end, 1) for s in sections]
)
tag_names = " - ".join([ str(MusicTag.from_tag(tag)) for tag in tags ])
tag_names = " - ".join([str(MusicTag.from_tag(tag)) for tag in tags])
download_name = f'{self.object.name} - {tag_names}.pdf'
download_name = f"{self.object.name} - {tag_names}.pdf"
response = FileResponse(result, content_type="application/pdf")
response['Content-Disposition'] = f'inline; filename="{download_name}"'
response["Content-Disposition"] = f'inline; filename="{download_name}"'
return response
class WorkAddDocumentView(CollectionMixin, CreateView):
template_name = "interface/default_form.html"
model = Document
fields = ['upload']
fields = ["upload"]
def title(self):
work = Work.objects.get(pk=self.kwargs['pk'])
work = Work.objects.get(pk=self.kwargs["pk"])
return f"Add a document to {work.name}"
def form_invalid(self, form):
if self.request.headers['Accept'] == 'application/json':
if self.request.headers["Accept"] == "application/json":
return HttpResponse(status=400)
return super().form_invalid(form)
def form_valid(self, form):
orig_name, ext = os.path.splitext(form.cleaned_data['upload'].name)
orig_name, ext = os.path.splitext(form.cleaned_data["upload"].name)
logger.info("Uploaded: %s", orig_name)
doc = form.save(commit=False)
doc.doctype = models.Document.DOCTYPE_MAP.get(ext.lower(), models.Document.DOCTYPE_MISC)
doc.work_id = self.kwargs['pk']
doc.doctype = models.Document.DOCTYPE_MAP.get(
ext.lower(), models.Document.DOCTYPE_MISC
)
doc.work_id = self.kwargs["pk"]
doc.save()
# auto tag the document
#name, ext = os.path.splitext(os.path.basename(doc.upload.name))
# name, ext = os.path.splitext(os.path.basename(doc.upload.name))
if doc.doctype == models.Document.DOCTYPE_PDF:
inst = auto_tag(orig_name)
if inst:
doc.sections.create(tag=inst.abbreviate())
if self.request.headers['Accept'] == 'application/json':
if self.request.headers["Accept"] == "application/json":
filename = os.path.basename(doc.upload.name)
return JsonResponse({
"message": "created",
"id": doc.pk,
"entry": render_to_string('library/document_entry.html',
{'collection': self.collection, 'doc': doc, 'request': self.request}
)
}, status=201)
return JsonResponse(
{
"message": "created",
"id": doc.pk,
"entry": render_to_string(
"library/document_entry.html",
{
"collection": self.collection,
"doc": doc,
"request": self.request,
},
),
},
status=201,
)
return redirect("document_annotate", self.collection.pk, doc.pk)
return redirect('document_annotate', self.collection.pk, doc.pk)
class DocumentMixin(CollectionMixin):
model = models.Document
def get_queryset(self):
qs = models.Document.objects.select_related('work')
qs = models.Document.objects.select_related("work")
if self.request.is_admin:
return qs
return qs.filter(work__collection=self.collection)
class DocumentDetailView(DocumentMixin, DetailView):
pass
class DocumentDownloadView(DocumentMixin, SingleObjectMixin, View):
class DocumentDownloadView(DocumentMixin, SingleObjectMixin, View):
def get(self, request, **args):
self.request = request
self.args = args
self.object = self.get_object()
if request.GET.get('method') == 'direct':
if request.GET.get("method") == "direct":
return redirect(self.object.upload.url)
response = FileResponse(self.object.upload.open('rb'), content_type="application/pdf")
response = FileResponse(
self.object.upload.open("rb"), content_type="application/pdf"
)
return response
class DocumentAnnotateView(DocumentMixin, DetailView):
template_name = 'library/document_annotate.html'
template_name = "library/document_annotate.html"
def post(self, request, **args):
self.request = request
@ -437,8 +544,8 @@ class DocumentAnnotateView(DocumentMixin, DetailView):
with transaction.atomic():
self.object.sections.all().delete()
for tag, start, end in data:
#pages.sort()
#end = pages[-1] if len(pages) > 1 else None
# pages.sort()
# end = pages[-1] if len(pages) > 1 else None
o = self.object.sections.create(tag=tag, start=start, end=end)
return HttpResponse(status=204)
@ -447,25 +554,29 @@ class DocumentAnnotateView(DocumentMixin, DetailView):
data = super(DocumentAnnotateView, self).get_context_data(**kwargs)
pages = []
for part in data['document'].sections.all():
for part in data["document"].sections.all():
pages.append((part.tag, part.start, part.end))
data['url'] = signed_url('document_download', collection=data['collection'].pk, pk=data['document'].pk)
data["url"] = signed_url(
"document_download",
collection=data["collection"].pk,
pk=data["document"].pk,
)
data['json_data'] = {'pageTags': pages, 'instruments': dict(MUSIC_TAGS)}
data["json_data"] = {"pageTags": pages, "instruments": dict(MUSIC_TAGS)}
return data
class DocumentDeleteView(DocumentMixin, DeleteView):
#def get_template_names(self):
class DocumentDeleteView(DocumentMixin, DeleteView):
# def get_template_names(self):
# return ["interface/default_form.html"]
def get_success_url(self):
return resolve_url('work_detail', self.collection.pk, self.object.work_id)
return resolve_url("work_detail", self.collection.pk, self.object.work_id)
class PartDownloadView(CollectionMixin, SingleObjectMixin, View):
pk_url_kwarg = 'section'
pk_url_kwarg = "section"
def get(self, request, **args):
self.request = request
@ -475,17 +586,25 @@ class PartDownloadView(CollectionMixin, SingleObjectMixin, View):
if self.object.start is None:
return redirect(self.object.doc.upload.url)
result = extract_pages(self.object.doc.upload.path, self.object.doc.work.name, self.object.start, self.object.end)
result = extract_pages(
self.object.doc.upload.path,
self.object.doc.work.name,
self.object.start,
self.object.end,
)
#download_name = f'{self.object.doc.work.name}_{self.object.instrument}.pdf'
# download_name = f'{self.object.doc.work.name}_{self.object.instrument}.pdf'
response = FileResponse(result, content_type="application/pdf")
response['Content-Disposition'] = f'inline; filename="{self.args["filename"]}"'
response["Content-Disposition"] = f'inline; filename="{self.args["filename"]}"'
return response
def get_object(self):
return Section.objects.filter(doc__work__collection=self.collection).select_related('doc', 'doc__work').get(pk=self.kwargs['section'])
return (
Section.objects.filter(doc__work__collection=self.collection)
.select_related("doc", "doc__work")
.get(pk=self.kwargs["section"])
)
class StorageBrowserView(CollectionMixin, TemplateView):
@ -493,7 +612,8 @@ class StorageBrowserView(CollectionMixin, TemplateView):
def get_context_data(self, **kwargs):
data = super().get_context_data(**kwargs)
folder = self.kwargs.get('folder') or data['collection'].prefix
data['folders'], data['files'] = data['collection'].storage.instance().listdir(folder)
folder = self.kwargs.get("folder") or data["collection"].prefix
data["folders"], data["files"] = (
data["collection"].storage.instance().listdir(folder)
)
return data

View File

@ -16,7 +16,8 @@ dependencies = [
"django-byostorage @ git+https://gitea.tfconsulting.com.au/tris/django-byostorage.git@9903bb00888f20dfd2d39754e5ee22eeb5f36298",
"requests (>=2.32.5,<3.0.0)",
"django-storages (>=1.14.6,<2.0.0)",
"boto3 (>=1.40.20,<2.0.0)"
"boto3 (>=1.40.20,<2.0.0)",
"whoosh (>=2.7.4,<3.0.0)"
]
[tool.poetry]