Compare commits

...

2 Commits

Author SHA1 Message Date
5e0e165037 Interface cleanup 2026-05-12 11:04:22 +10:00
4164d56dea Library cleanup 2026-05-12 11:03:11 +10:00
24 changed files with 937 additions and 576 deletions

View File

@ -2,28 +2,33 @@ from django.contrib import admin
from . import models
class EnsembleAdmin(admin.ModelAdmin):
list_display = ['name', 'slug']
list_display = ["name", "slug"]
class ModuleInline(admin.StackedInline):
model = models.Module
extra = 0
class ProjectAdmin(admin.ModelAdmin):
list_display = ['name', 'ensemble', 'event_date', 'active']
list_filter = ['ensemble', 'active']
class ProjectAdmin(admin.ModelAdmin):
list_display = ["name", "ensemble", "event_date", "active"]
list_filter = ["ensemble", "active"]
inlines = [ModuleInline]
class ResourceAdmin(admin.ModelAdmin):
list_display = ['name', 'media_type', 'project']
list_filter = ['project']
list_display = ["name", "media_type", "project"]
list_filter = ["project"]
class WikiPageAdmin(admin.ModelAdmin):
list_display = ['title', 'project']
list_filter = ['project']
list_display = ["title", "project"]
list_filter = ["project"]
admin.site.register(models.Ensemble, EnsembleAdmin)
admin.site.register(models.Project, ProjectAdmin)
admin.site.register(models.Resource, ResourceAdmin)
admin.site.register(models.WikiPage, WikiPageAdmin)
admin.site.register(models.WikiPage, WikiPageAdmin)

View File

@ -2,4 +2,4 @@ from django.apps import AppConfig
class InterfaceConfig(AppConfig):
name = 'interface'
name = "interface"

View File

@ -1,4 +1,5 @@
from crispy_forms.layout import Field
class BulmaFileUpload(Field):
template = 'bulma/file_upload.html'
template = "bulma/file_upload.html"

View File

@ -5,58 +5,72 @@ from crispy_bulma.layout import FormGroup
from . import models, fields
class BaseForm(forms.Form):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.helper = self.get_form_helper()
def get_form_helper(self):
helper = FormHelper(self)
#helper.add_input(Submit('submit', 'Submit', css_class='button is-link'))
#helper.layout.subm append(HTML('<a class="button is-light">Cancel</a>'))
#print(helper.layout)
helper.layout.append(FormGroup(
Submit('submit', 'Save', css_class="button is-primary"),
HTML('{% if view.cancel_url %}<div class="control"><a href="{{ view.cancel_url }}" class="button is-light">Cancel</a></div>{% endif %}')
))
# helper.add_input(Submit('submit', 'Submit', css_class='button is-link'))
# helper.layout.subm append(HTML('<a class="button is-light">Cancel</a>'))
# print(helper.layout)
helper.layout.append(
FormGroup(
Submit("submit", "Save", css_class="button is-primary"),
HTML(
'{% if view.cancel_url %}<div class="control"><a href="{{ view.cancel_url }}" class="button is-light">Cancel</a></div>{% endif %}'
),
)
)
return helper
class ProjectForm(forms.ModelForm, BaseForm):
class Meta:
model = models.Project
fields = ['name', 'description', 'modules', 'event_date']
#widgets = {
fields = ["name", "description", "modules", "event_date"]
# widgets = {
# 'event_date': forms.DateTimeInput(attrs={'type': 'date'})
#}
# }
modules = forms.MultipleChoiceField(
choices=[(x, x.title()) for x in models.settings.POLYPHONIC_MODULES],
widget=forms.CheckboxSelectMultiple,
required=False,
)
modules = forms.MultipleChoiceField(choices=[ (x, x.title()) for x in models.settings.POLYPHONIC_MODULES ],
widget=forms.CheckboxSelectMultiple, required=False)
class ResourceForm(forms.ModelForm, BaseForm):
class Meta:
model = models.Resource
fields = ['name', 'media_type', 'description', 'file']
fields = ["name", "media_type", "description", "file"]
def get_form_helper(self):
helper = super().get_form_helper()
helper[3].wrap(fields.BulmaFileUpload)
return helper
class WikiForm(forms.ModelForm, BaseForm):
class WikiForm(forms.ModelForm, BaseForm):
class Meta:
model = models.WikiPage
fields = ['title', 'markdown']
fields = ["title", "markdown"]
class CodeForm(BaseForm):
code = forms.CharField(max_length=14,
widget=forms.TextInput(attrs={'placeholder': 'xxx-xxx-xxx', 'inputmode': 'numeric'}))
code = forms.CharField(
max_length=14,
widget=forms.TextInput(
attrs={"placeholder": "xxx-xxx-xxx", "inputmode": "numeric"}
),
)
passphrase = forms.CharField(max_length=32)
class ResourceUploadForm(forms.Form):
pass
# file = S3UploadField()
# file = S3UploadField()

View File

@ -14,11 +14,12 @@ import os.path
from .utils import sign_data
MEDIA_TYPES = [
('audio', "Audio"),
('video', "Video"),
('general', "General"),
("audio", "Audio"),
("video", "Video"),
("general", "General"),
]
def rough_date(d):
if not d:
return False, "sometime..."
@ -27,53 +28,62 @@ def rough_date(d):
if in_past:
days = abs(days)
if days == 0:
m = int((d-timezone.now()).seconds/60)
m = int((d - timezone.now()).seconds / 60)
if m > 60:
return in_past, "{0:d} hours".format(int(m / 60))
return in_past, "{0:d} minutes!".format(int(m % 60))
if days >= 14:
return in_past, "{0:d} weeks".format(int(days/7))
return in_past, "{0:d} weeks".format(int(days / 7))
if days >= 7:
return in_past, "{0:d} weeks, {1:d} days".format(int(days / 7), int(days % 7))
return in_past, f"{days} days"
def generate_code(length=9):
return "".join([ random.choice('0123456789') for _ in range(length) ])
return "".join([random.choice("0123456789") for _ in range(length)])
class EnsembleQuerySet(models.QuerySet):
def for_user(self, user, ensemble_keys=[], project_keys=[]):
if user.is_superuser:
return self
return self
f = models.Q(slug__in=ensemble_keys) | models.Q(projects__in=project_keys)
if user.is_authenticated:
f |= models.Q(admins=user.pk)
return self.filter(f).distinct()
class Ensemble(models.Model):
''' A group that plays together
'''
name = models.CharField(max_length=100,
help_text="Display name")
slug = models.SlugField(max_length=100, editable=False, unique=True,
help_text="Short name for the ensemble - used for folders")
admins = models.ManyToManyField('auth.User', related_name='ensembles')
details = models.TextField(blank=True,
help_text="Description of the ensemble (markdown)")
storage = models.ForeignKey('byostorage.UserStorage', null=True, on_delete=models.SET_NULL,
help_text="Default storage for this ensemble")
nonce = models.SmallIntegerField(default=1,
help_text="Increment this to reset the authentication links")
"""A group that plays together"""
name = models.CharField(max_length=100, help_text="Display name")
slug = models.SlugField(
max_length=100,
editable=False,
unique=True,
help_text="Short name for the ensemble - used for folders",
)
admins = models.ManyToManyField("auth.User", related_name="ensembles")
details = models.TextField(
blank=True, help_text="Description of the ensemble (markdown)"
)
storage = models.ForeignKey(
"byostorage.UserStorage",
null=True,
on_delete=models.SET_NULL,
help_text="Default storage for this ensemble",
)
nonce = models.SmallIntegerField(
default=1, help_text="Increment this to reset the authentication links"
)
objects = EnsembleQuerySet.as_manager()
class Meta:
ordering = ('slug', )
ordering = ("slug",)
def active_projects(self):
return self.projects.active().current()
@ -83,7 +93,7 @@ class Ensemble(models.Model):
return False
if user.is_superuser:
return True
return user.pk in self.admins.values_list('pk', flat=True)
return user.pk in self.admins.values_list("pk", flat=True)
def save(self, **kwargs):
if not self.slug:
@ -91,49 +101,56 @@ class Ensemble(models.Model):
super(Ensemble, self).save(**kwargs)
def get_absolute_url(self):
return resolve_url('ensemble_detail', ensemble=self.slug)
return resolve_url("ensemble_detail", ensemble=self.slug)
def auth(self):
return sign_data(f'{self.pk}-{self.nonce}', 12)
return sign_data(f"{self.pk}-{self.nonce}", 12)
def __str__(self):
return self.name
class ProjectQuerySet(models.QuerySet):
def current(self):
return self.filter(models.Q(event_date__gte=(timezone.now()-timezone.timedelta(7))) | models.Q(event_date=None))
return self.filter(
models.Q(event_date__gte=(timezone.now() - timezone.timedelta(7)))
| models.Q(event_date=None)
)
def active(self):
return self.filter(active=True)
def for_user(self, user, project_keys=[], ensemble_keys=[]):
if user.is_superuser:
return self
return self
f = models.Q(pk__in=project_keys) | models.Q(ensemble__slug__in=ensemble_keys)
if user.is_authenticated:
f |= models.Q(ensemble__admins=user.pk)
return self.filter(f)
class Project(models.Model):
''' A Project linked to an ensemble
'''
"""A Project linked to an ensemble"""
name = models.CharField(max_length=100)
ensemble = models.ForeignKey(Ensemble, related_name='projects', on_delete=models.CASCADE, null=True)
description = models.TextField(blank=True,
help_text="Markdown format")
ensemble = models.ForeignKey(
Ensemble, related_name="projects", on_delete=models.CASCADE, null=True
)
description = models.TextField(blank=True, help_text="Markdown format")
active = models.BooleanField(default=True)
event_date =models.DateTimeField(null=True, blank=True)
event_date = models.DateTimeField(null=True, blank=True)
owner = models.CharField(max_length=255, blank=True)
nonce = models.SmallIntegerField(default=1,
help_text="Increment this to reset the authentication links")
nonce = models.SmallIntegerField(
default=1, help_text="Increment this to reset the authentication links"
)
objects = ProjectQuerySet.as_manager()
class Meta:
ordering = ['active', 'event_date']
ordering = ["active", "event_date"]
@property
def days(self):
@ -162,62 +179,74 @@ class Project(models.Model):
@property
def active_modules(self):
return self.modules.values_list('name', flat=True)
return self.modules.values_list("name", flat=True)
def get_absolute_url(self):
return resolve_url('project_detail', project=self.pk)
return resolve_url("project_detail", project=self.pk)
def auth(self):
return sign_data(f'{self.pk}-{self.nonce}', 12)
return sign_data(f"{self.pk}-{self.nonce}", 12)
def __str__(self):
return self.name
class Module(models.Model):
''' Enable modules on a oriject
'''
name = models.SlugField(max_length=20, choices=[ (x, x.title()) for x in settings.POLYPHONIC_MODULES ])
project = models.ForeignKey(Project, related_name="modules", on_delete=models.CASCADE)
"""Enable modules on a oriject"""
name = models.SlugField(
max_length=20, choices=[(x, x.title()) for x in settings.POLYPHONIC_MODULES]
)
project = models.ForeignKey(
Project, related_name="modules", on_delete=models.CASCADE
)
def __str__(self):
return self.name
def resource_key(resource, filename):
return f'{resource.project.folder}/resources/{filename}'
return f"{resource.project.folder}/resources/{filename}"
class Resource(models.Model):
''' A viewable file resource attached to a project
"""A viewable file resource attached to a project
e.g PDF instructions, MP3 backing track
'''
project = models.ForeignKey(Project, related_name='resources', on_delete=models.CASCADE)
"""
project = models.ForeignKey(
Project, related_name="resources", on_delete=models.CASCADE
)
name = models.CharField(max_length=100)
description = models.TextField(blank=True)
file = models.FileField(storage=BYOStorage(), upload_to=resource_key)
media_type = models.CharField(max_length=10, choices=MEDIA_TYPES, default='*')
media_type = models.CharField(max_length=10, choices=MEDIA_TYPES, default="*")
visible = models.BooleanField(default=True)
class Meta:
ordering = ['-visible', '-pk']
ordering = ["-visible", "-pk"]
def accept(self):
if self.media_type == 'general':
if self.media_type == "general":
return ".*"
return f"{self.media_type}/*"
def __str__(self):
return self.name
class WikiPage(models.Model):
''' An editable wiki page for the project in markdown format
'''
project = models.ForeignKey(Project, related_name='wiki_pages', on_delete=models.CASCADE)
"""An editable wiki page for the project in markdown format"""
project = models.ForeignKey(
Project, related_name="wiki_pages", on_delete=models.CASCADE
)
title = models.CharField(max_length=255)
markdown = models.TextField()
def get_absolute_url(self):
return resolve_url('wiki', project=self.project_id, pk=self.pk)
return resolve_url("wiki", project=self.project_id, pk=self.pk)
def __str__(self):
return self.title
return self.title

View File

@ -3,7 +3,9 @@ import os.path
register = template.Library()
def basename(value):
return os.path.basename(value)
register.filter('basename', basename)
register.filter("basename", basename)

View File

@ -3,12 +3,16 @@ from django.utils import timesince
register = template.Library()
def roughtimesince(value):
return timesince.timesince(value, depth=1)
register.filter('roughtimesince', roughtimesince)
register.filter("roughtimesince", roughtimesince)
def roughtimeuntil(value):
return timesince.timeuntil(value, depth=1)
register.filter('roughtimeuntil', roughtimeuntil)
register.filter("roughtimeuntil", roughtimeuntil)

View File

@ -2,9 +2,10 @@ from django import template
register = template.Library()
@register.simple_tag(takes_context=True)
def url_update(context, **kwargs):
params = context.request.GET.copy()
for k in kwargs:
params[k] = kwargs[k]
return "?" + params.urlencode()
return "?" + params.urlencode()

View File

@ -4,8 +4,8 @@ from django.contrib.auth.models import User
from django.utils import timezone
from datetime import timedelta
class AccessTestCase(TestCase):
class AccessTestCase(TestCase):
USERS = ()
ENSEMBLES = ()
@ -19,13 +19,13 @@ class AccessTestCase(TestCase):
cls.users = {}
for details in cls.USERS:
cls.users[details['username']] = User.objects.create_user(**details)
cls.users[details["username"]] = User.objects.create_user(**details)
now = timezone.now()
cls.ensembles = {}
for details in cls.ENSEMBLES:
admins = details.pop('admins', [])
admins = details.pop("admins", [])
obj = models.Ensemble.objects.create(**details)
for admin in admins:
obj.admins.add(cls.users[admin])
@ -33,37 +33,41 @@ class AccessTestCase(TestCase):
cls.projects = {}
for details in cls.PROJECTS:
when = details.pop('when', 0)
ensemble = details.pop('ensemble')
details['event_date'] = now + timedelta(days=when) if when else None
when = details.pop("when", 0)
ensemble = details.pop("ensemble")
details["event_date"] = now + timedelta(days=when) if when else None
obj = cls.ensembles[ensemble].projects.create(**details)
cls.projects[details['name']] = obj
cls.projects[details["name"]] = obj
return
def test_protected_views(self):
self.assertAccess({ x: False for x in self.PROTECTED_URLS })
self.assertAccess({x: False for x in self.PROTECTED_URLS})
if 'admin' in self.users:
self.client.force_login(self.users['admin'])
self.assertAccess({ x: True for x in self.PROTECTED_URLS })
if "admin" in self.users:
self.client.force_login(self.users["admin"])
self.assertAccess({x: True for x in self.PROTECTED_URLS})
def login(self, user, passwd):
response = self.client.post('/login', {'username': user, 'password': passwd})
response = self.client.post("/login", {"username": user, "password": passwd})
self.assertEqual(response.status_code, 302, f"Failed to login as {user}")
def authorize(self, model, **kwargs):
object = model.objects.get(**kwargs)
response = self.client.get(f'{object.get_absolute_url()}?auth={object.auth()}')
response = self.client.get(f"{object.get_absolute_url()}?auth={object.auth()}")
self.assertEqual(response.status_code, 302)
def assertAccess(self, urls):
for url, expected in urls.items():
response = self.client.get(url)
self.assertEqual(response.status_code == 200, expected, f"Expected {expected} for {url} (status: {response.status_code})")
self.assertEqual(
response.status_code == 200,
expected,
f"Expected {expected} for {url} (status: {response.status_code})",
)
def assertObjectList(self, response, expected, element='name'):
def assertObjectList(self, response, expected, element="name"):
self.assertEqual(response.status_code, 200, "No result returned")
objects = response.context['object_list'].values_list(element, flat=True)
self.assertEqual(list(objects), expected)
objects = response.context["object_list"].values_list(element, flat=True)
self.assertEqual(list(objects), expected)

View File

@ -5,179 +5,213 @@ from django.contrib.auth.models import User
from . import AccessTestCase
class InterfaceAccessTestCase(AccessTestCase):
class InterfaceAccessTestCase(AccessTestCase):
USERS = (
{'username': 'admin', 'password': 'secret', 'is_superuser': True, 'is_staff': True},
{'username': 'homer', 'password': 'maggie'},
{
"username": "admin",
"password": "secret",
"is_superuser": True,
"is_staff": True,
},
{"username": "homer", "password": "maggie"},
)
ENSEMBLES = (
{'name': 'The Be Sharps', 'slug': 'be-sharps', 'admins': ['homer']},
{'name': 'Lisa & the Bleeding Gums', 'slug': 'bleeding-gums'},
{'name': 'Party Posse'},
{"name": "The Be Sharps", "slug": "be-sharps", "admins": ["homer"]},
{"name": "Lisa & the Bleeding Gums", "slug": "bleeding-gums"},
{"name": "Party Posse"},
)
PROJECTS = (
{'name': 'Baker St', 'ensemble': 'bleeding-gums', 'when': -12},
{'name': 'Navy Recruitment Day', 'ensemble': 'party-posse', 'when': 6},
{'name': 'Barbershop Contest', 'ensemble': 'be-sharps', 'when': 28},
{'name': 'Open Mic Night', 'ensemble': 'bleeding-gums', 'when': 1 },
{'name': 'Current Repertoire', 'ensemble': 'be-sharps'},
{"name": "Baker St", "ensemble": "bleeding-gums", "when": -12},
{"name": "Navy Recruitment Day", "ensemble": "party-posse", "when": 6},
{"name": "Barbershop Contest", "ensemble": "be-sharps", "when": 28},
{"name": "Open Mic Night", "ensemble": "bleeding-gums", "when": 1},
{"name": "Current Repertoire", "ensemble": "be-sharps"},
)
PROTECTED_URLS = (
'/ensembles/be-sharps',
'/ensembles/be-sharps/new-project',
'/projects/3',
'/projects/3/resources',
'/projects/3/resources/add',
'/admin/interface/ensemble/',
'/admin/interface/project/',
'/admin/interface/resource/',
'/admin/interface/wikipage/',
"/ensembles/be-sharps",
"/ensembles/be-sharps/new-project",
"/projects/3",
"/projects/3/resources",
"/projects/3/resources/add",
"/admin/interface/ensemble/",
"/admin/interface/project/",
"/admin/interface/resource/",
"/admin/interface/wikipage/",
)
def test_bad_login(self):
with self.assertRaisesMessage(self.failureException, 'Failed to login as admin'):
self.login('admin', 'admin')
with self.assertRaisesMessage(
self.failureException, "Failed to login as admin"
):
self.login("admin", "admin")
def test_superuser_ensembles(self):
self.login('admin', 'secret')
response = self.client.get('/ensembles')
self.assertObjectList(response, ['The Be Sharps', 'Lisa & the Bleeding Gums', 'Party Posse'])
self.assertContains(response, 'Django Admin')
self.login("admin", "secret")
response = self.client.get("/ensembles")
self.assertObjectList(
response, ["The Be Sharps", "Lisa & the Bleeding Gums", "Party Posse"]
)
self.assertContains(response, "Django Admin")
def test_superuser_ensemble_permissions(self):
self.login('admin', 'secret')
response = self.client.get('/ensembles/party-posse')
self.assertTrue(response.context['request'].is_admin)
self.login("admin", "secret")
response = self.client.get("/ensembles/party-posse")
self.assertTrue(response.context["request"].is_admin)
self.assertContains(response, "Add project")
self.assertAccess({
'/ensembles/be-sharps': True,
'/ensembles/bleeding-gums': True,
'/ensembles/party-posse': True,
'/ensembles/unknown': False,
'/ensembles/be-sharps/new-project': True,
})
self.assertAccess(
{
"/ensembles/be-sharps": True,
"/ensembles/bleeding-gums": True,
"/ensembles/party-posse": True,
"/ensembles/unknown": False,
"/ensembles/be-sharps/new-project": True,
}
)
def test_superuser_projects(self):
self.login('admin', 'secret')
response = self.client.get('/projects')
self.assertObjectList(response, ['Current Repertoire', 'Open Mic Night', 'Navy Recruitment Day', 'Barbershop Contest'])
self.assertObjectList(self.client.get('/ensembles/bleeding-gums'), ['Open Mic Night'])
self.assertObjectList(self.client.get('/ensembles/bleeding-gums?inactive'), ['Open Mic Night', 'Baker St'])
self.login("admin", "secret")
response = self.client.get("/projects")
self.assertObjectList(
response,
[
"Current Repertoire",
"Open Mic Night",
"Navy Recruitment Day",
"Barbershop Contest",
],
)
self.assertObjectList(
self.client.get("/ensembles/bleeding-gums"), ["Open Mic Night"]
)
self.assertObjectList(
self.client.get("/ensembles/bleeding-gums?inactive"),
["Open Mic Night", "Baker St"],
)
def test_user_ensembles(self):
self.login('homer', 'maggie')
response = self.client.get('/ensembles')
self.assertObjectList(response, ['The Be Sharps'])
self.login("homer", "maggie")
response = self.client.get("/ensembles")
self.assertObjectList(response, ["The Be Sharps"])
self.assertNotContains(response, 'Django Admin')
self.assertNotContains(response, "Django Admin")
def test_user_ensemble_permissions(self):
self.login('homer', 'maggie')
response = self.client.get('/ensembles/be-sharps')
self.assertTrue(response.context['request'].is_admin)
self.login("homer", "maggie")
response = self.client.get("/ensembles/be-sharps")
self.assertTrue(response.context["request"].is_admin)
self.assertContains(response, "Add project")
self.assertContains(response, 'Show all')
self.assertAccess({
'/ensembles/be-sharps': True,
'/ensembles/bleeding-gums': False,
'/ensembles/party-posse': False,
'/ensembles/be-sharps/new-project': True,
'/ensembles/party-posse/new-project': False,
})
self.assertContains(response, "Show all")
self.assertAccess(
{
"/ensembles/be-sharps": True,
"/ensembles/bleeding-gums": False,
"/ensembles/party-posse": False,
"/ensembles/be-sharps/new-project": True,
"/ensembles/party-posse/new-project": False,
}
)
self.authorize(models.Ensemble, slug='bleeding-gums')
self.assertAccess({
'/ensembles/be-sharps': True,
'/ensembles/bleeding-gums': True,
'/ensembles/party-posse': False,
'/ensembles/be-sharps/new-project': True,
'/ensembles/party-posse/new-project': False,
})
response = self.client.get('/ensembles/bleeding-gums')
self.assertFalse(response.context['request'].is_admin)
self.assertNotContains(response, 'Add project')
self.assertNotContains(response, 'Show all')
self.authorize(models.Ensemble, slug="bleeding-gums")
self.assertAccess(
{
"/ensembles/be-sharps": True,
"/ensembles/bleeding-gums": True,
"/ensembles/party-posse": False,
"/ensembles/be-sharps/new-project": True,
"/ensembles/party-posse/new-project": False,
}
)
response = self.client.get("/ensembles/bleeding-gums")
self.assertFalse(response.context["request"].is_admin)
self.assertNotContains(response, "Add project")
self.assertNotContains(response, "Show all")
def test_user_projects(self):
self.login('homer', 'maggie')
response = self.client.get('/projects')
self.assertObjectList(response, ['Current Repertoire', 'Barbershop Contest'])
response = self.client.get('/projects/3')
self.assertTrue(response.context['request'].is_admin)
self.login("homer", "maggie")
response = self.client.get("/projects")
self.assertObjectList(response, ["Current Repertoire", "Barbershop Contest"])
response = self.client.get("/projects/3")
self.assertTrue(response.context["request"].is_admin)
self.assertAccess({
'/projects/3': True,
'/projects/3/resources': True,
'/projects/3/resources/add': True,
'/projects/4': False,
'/projects/4/resources': False,
'/projects/4/resources/add': False,
})
self.assertAccess(
{
"/projects/3": True,
"/projects/3/resources": True,
"/projects/3/resources/add": True,
"/projects/4": False,
"/projects/4/resources": False,
"/projects/4/resources/add": False,
}
)
self.authorize(models.Project, pk=4)
response = self.client.get('/projects')
self.assertObjectList(response, ['Current Repertoire', 'Open Mic Night', 'Barbershop Contest'])
response = self.client.get('/projects/4')
self.assertFalse(response.context['request'].is_admin)
response = self.client.get("/projects")
self.assertObjectList(
response, ["Current Repertoire", "Open Mic Night", "Barbershop Contest"]
)
response = self.client.get("/projects/4")
self.assertFalse(response.context["request"].is_admin)
def test_anon_ensembles(self):
response = self.client.get('/ensembles')
response = self.client.get("/ensembles")
self.assertObjectList(response, [])
self.assertContains(response, 'You don\'t currently have access to any ensembles')
self.assertContains(
response, "You don't currently have access to any ensembles"
)
def test_anon_authorized_ensemble(self):
self.authorize(models.Ensemble, slug='party-posse')
response = self.client.get('/ensembles/party-posse')
self.assertContains(response, 'Party Posse')
response = self.client.get('/ensembles')
self.assertObjectList(response, ['Party Posse'])
self.authorize(models.Ensemble, slug="party-posse")
response = self.client.get("/ensembles/party-posse")
self.assertContains(response, "Party Posse")
response = self.client.get("/ensembles")
self.assertObjectList(response, ["Party Posse"])
self.assertAccess(
{
"/ensembles/be-sharps": False,
"/ensembles/party-posse": True,
"/ensembles/bleeding-gums": False,
"/ensembles/unknown": False,
}
)
response = self.client.get("/projects")
self.assertObjectList(response, ["Navy Recruitment Day"])
self.assertAccess({
'/ensembles/be-sharps': False,
'/ensembles/party-posse': True,
'/ensembles/bleeding-gums': False,
'/ensembles/unknown': False,
})
response = self.client.get('/projects')
self.assertObjectList(response, ['Navy Recruitment Day'])
def test_anon_authorized_project(self):
self.authorize(models.Project, pk=4)
self.assertObjectList(self.client.get('/projects'), ['Open Mic Night'])
self.assertObjectList(self.client.get('/ensembles'), ['Lisa & the Bleeding Gums'])
self.assertObjectList(self.client.get("/projects"), ["Open Mic Night"])
self.assertObjectList(
self.client.get("/ensembles"), ["Lisa & the Bleeding Gums"]
)
self.assertAccess({
'/projects/4': True,
'/projects/4/resources': True,
'/projects/1': False,
'/projects/1/resources': False,
})
self.assertAccess(
{
"/projects/4": True,
"/projects/4/resources": True,
"/projects/1": False,
"/projects/1/resources": False,
}
)
def test_anon_permission_denied(self):
self.assertAccess({
'/ensembles': True,
'/ensembles/be-sharps': False,
'/ensembles/party-posse': False,
'/ensembles/bleeding-gums': False,
'/ensembles/unknown': False,
})
self.assertAccess(
{
"/ensembles": True,
"/ensembles/be-sharps": False,
"/ensembles/party-posse": False,
"/ensembles/bleeding-gums": False,
"/ensembles/unknown": False,
}
)
def test_anon_deauthorize_project(self):
self.authorize(models.Project, pk=4)
self.assertAccess({
'/projects/4': True
})
self.assertAccess({"/projects/4": True})
models.Project.objects.filter(pk=4).update(nonce=2)
self.assertAccess({
'/projects/4': False
})
self.assertAccess({"/projects/4": False})

View File

@ -1,6 +1,6 @@
from django.test import TestCase
class IntegrationTestCase(TestCase):
class IntegrationTestCase(TestCase):
def test_runs(self):
self.assertTrue(True)
self.assertTrue(True)

View File

@ -77,4 +77,3 @@ if settings.DEBUG:
urlpatterns.append(
path("local_storage/<path:path>", serve, {"document_root": "local_storage"})
)

View File

@ -5,8 +5,10 @@ from django.core.exceptions import SuspiciousOperation
signer = Signer()
import logging
logger = logging.getLogger(__name__)
def sign_data(data, l=None):
sig = signer.sign(data)
p = len(data) + 1
@ -14,6 +16,7 @@ def sign_data(data, l=None):
l += p
return sig[p:l]
def signed_url(name, **kwargs):
"""
>>> signed_url('foo/bar')
@ -23,16 +26,19 @@ def signed_url(name, **kwargs):
sep = "&" if "?" in url else "?"
return sig.replace(":", f"{sep}auth=")
def check_signed_url(full_path):
p = full_path.rfind('auth')
url = full_path[:p-1]
p = full_path.rfind("auth")
url = full_path[: p - 1]
logger.debug("check_signed_url: %s", url)
signed = signed_url(url)
if signed != full_path:
logger.debug("Mismatch: %s != %s", full_path, signed)
signed = "_HIDDEN_"
raise SuspiciousOperation("Bad auth code")
if __name__ == '__main__':
if __name__ == "__main__":
import doctest
print(doctest.testmod())
print(doctest.testmod())

View File

@ -2,60 +2,77 @@ from django.contrib import admin
from . import models
class EnsembleAccessInline(admin.StackedInline):
model = models.EnsembleAccess
extra = 0
class CollectionAdmin(admin.ModelAdmin):
list_display = ['name', 'location', 'storage', 'prefix']
list_display = ["name", "location", "storage", "prefix"]
inlines = [EnsembleAccessInline]
admin.site.register(models.Collection, CollectionAdmin)
class ItemInline(admin.TabularInline):
model = models.ProjectItem
extra = 0
class DocInline(admin.TabularInline):
model = models.Document
extra = 0
class MetaInline(admin.TabularInline):
model = models.WorkMeta
extra = 0
class WorkAdmin(admin.ModelAdmin):
list_display = ['name', 'composer', 'edition', 'identifier', 'running_time']
list_filter = ['collection']
search_fields = ['name', 'composer']
list_display = ["name", "composer", "edition", "identifier", "running_time"]
list_filter = ["collection"]
search_fields = ["name", "composer"]
inlines = [MetaInline, DocInline, ItemInline]
admin.site.register(models.Work, WorkAdmin)
class SectionInline(admin.TabularInline):
model = models.Section
fields = ['tag', 'start', 'end', 'page']
fields = ["tag", "start", "end", "page"]
class DocumentAdmin(admin.ModelAdmin):
list_display = ['work', '__str__']
list_filter = ['work__collection']
list_display = ["work", "__str__"]
list_filter = ["work__collection"]
inlines = [SectionInline]
admin.site.register(models.Document, DocumentAdmin)
class ItemAdmin(admin.ModelAdmin):
list_display = ['project', 'work', 'order']
list_filter = ['project']
list_display = ["project", "work", "order"]
list_filter = ["project"]
admin.site.register(models.ProjectItem, ItemAdmin)
class EnsembleAccessAdmin(admin.ModelAdmin):
list_display = ['ensemble', 'collection', 'access_type']
list_filter = ['ensemble']
list_display = ["ensemble", "collection", "access_type"]
list_filter = ["ensemble"]
admin.site.register(models.EnsembleAccess, EnsembleAccessAdmin)
class OrchestrationAdmin(admin.ModelAdmin):
list_display = ['name', 'instruments']
admin.site.register(models.Orchestration, OrchestrationAdmin)
class OrchestrationAdmin(admin.ModelAdmin):
list_display = ["name", "instruments"]
admin.site.register(models.Orchestration, OrchestrationAdmin)

View File

@ -2,4 +2,4 @@ from django.apps import AppConfig
class LibraryConfig(AppConfig):
name = 'library'
name = "library"

View File

@ -5,29 +5,42 @@ from interface.forms import BaseForm
class WorkCreateForm(forms.ModelForm, BaseForm):
class Meta:
model = Work
fields = ['name', 'composer', 'edition', 'code', 'orchestration', 'licence', 'running_time', 'notes']
fields = [
"name",
"composer",
"edition",
"code",
"orchestration",
"licence",
"running_time",
"notes",
]
class PlaylistAddForm(forms.Form):
work = forms.ModelChoiceField(queryset=Work.objects.all())
def __init__(self, instance, *args, **kwargs):
super(PlaylistAddForm, self).__init__(*args, **kwargs)
super(PlaylistAddForm, self).__init__(*args, **kwargs)
existing = [ x[0] for x in instance.works.values_list('pk') ]
existing = [x[0] for x in instance.works.values_list("pk")]
qs = Work.objects.filter(ensemble_id=instance.ensemble_id).exclude(
id__in=existing
)
self.fields["work"].queryset = qs
self.instance = instance
qs = Work.objects.filter(ensemble_id=instance.ensemble_id).exclude(id__in=existing)
self.fields['work'].queryset = qs
self.instance = instance
def save(self):
self.instance.works.add(self.cleaned_data['work'])
self.instance.works.add(self.cleaned_data["work"])
class ProjectEnsembleChoiceField(forms.ModelChoiceField):
def label_from_instance(self, obj):
return f"{obj.ensemble.name} - {obj.name}"
class ProjectSelectForm(BaseForm):
project = ProjectEnsembleChoiceField(queryset=Project.objects.all())
project = ProjectEnsembleChoiceField(queryset=Project.objects.all())

View File

@ -4,17 +4,20 @@ import csv
from library import models
class Command(BaseCommand):
help = 'Imports works from a csv file'
help = "Imports works from a csv file"
def add_arguments(self, parser):
parser.add_argument('collection', type=int, help="Collection ID")
parser.add_argument('source', type=argparse.FileType('r'), help="Source CSV")
parser.add_argument("collection", type=int, help="Collection ID")
parser.add_argument("source", type=argparse.FileType("r"), help="Source CSV")
def handle(self, *args, **options):
collection = models.Collection.objects.get(pk=options['collection'])
collection = models.Collection.objects.get(pk=options["collection"])
reader = csv.DictReader(options['source'])
reader = csv.DictReader(options["source"])
for row in reader:
collection.works.create(name=row['Piece'], composer=row['Composer'], notes=row['Notes'])
collection.works.create(
name=row["Piece"], composer=row["Composer"], notes=row["Notes"]
)

View File

@ -18,44 +18,54 @@ from interface.utils import sign_data
import logging
#from polyphonic.settings import LIBRARY_STORAGE
# from polyphonic.settings import LIBRARY_STORAGE
logger = logging.getLogger(__name__)
#try:
# try:
# library_storage = get_storage_class(settings.LIBRARY_STORAGE)()
#except (ImportError, AttributeError):
# except (ImportError, AttributeError):
# logger.exception("Failed to load library storage")
# library_storage = get_storage_class()()
#logger.info("Library storage: %s", library_storage.__class__.__name__)
# logger.info("Library storage: %s", library_storage.__class__.__name__)
# FIXME: move back to settings
library_storage = CachedStorage(BYOStorage())
class Orchestration(models.Model):
"""
Stores a list of instrument codes as a single entry (space delimited).
Can be global or ensemble specific
"""
collection = models.ForeignKey('Collection', on_delete=models.CASCADE, related_name="custom_orchestrations", null=True, blank=True)
collection = models.ForeignKey(
"Collection",
on_delete=models.CASCADE,
related_name="custom_orchestrations",
null=True,
blank=True,
)
name = models.CharField(max_length=100)
instruments = models.TextField()
def as_list(self):
tags = [ t.strip() for t in self.instruments.split(' ') ]
return [ (t, MusicTag.from_tag(t)) for t in tags if t ]
tags = [t.strip() for t in self.instruments.split(" ")]
return [(t, MusicTag.from_tag(t)) for t in tags if t]
def tag_order(self):
tags = [ t.strip() for t in self.instruments.split(' ') if t ]
order = {'score': 0}
tags = [t.strip() for t in self.instruments.split(" ") if t]
order = {"score": 0}
for i, t in enumerate(tags):
order.setdefault(t.strip('-0123456789'), i*2+1)
order.setdefault(t.strip("-0123456789"), i * 2 + 1)
return order
def sorter(self):
tag_order = self.tag_order()
def f(x):
return (tag_order.get(x[0].strip('-0123456789'), 1000), x[0])
return (tag_order.get(x[0].strip("-0123456789"), 1000), x[0])
return f
def save(self):
@ -65,75 +75,101 @@ class Orchestration(models.Model):
def __str__(self):
return self.name
class ProjectItem(models.Model):
"""
ProjectItem represents a Work attached to a Project e.g. item in set list or programme
It also allows works to be shared from one ensemble to another on a per-project basis.
"""
project = models.ForeignKey('interface.Project', on_delete=models.CASCADE, related_name='items')
work = models.ForeignKey('Work', on_delete=models.CASCADE, related_name='project_items')
project = models.ForeignKey(
"interface.Project", on_delete=models.CASCADE, related_name="items"
)
work = models.ForeignKey(
"Work", on_delete=models.CASCADE, related_name="project_items"
)
checkout = models.DateTimeField()
due = models.DateTimeField(null=True, blank=True)
returned = models.DateTimeField(null=True, blank=True)
approved_by = models.ForeignKey('auth.User', on_delete=models.CASCADE)
approved_by = models.ForeignKey("auth.User", on_delete=models.CASCADE)
order = models.SmallIntegerField(default=0)
section = models.CharField(max_length=100, blank=True)
class Meta:
ordering = ['order', 'work']
ordering = ["order", "work"]
def __str__(self):
return f"<{self.project_id}:{slugify(self.work.name)}>"
class Collection(models.Model):
"""
A logical collection of works, typically owned by an organisation or person (physical or virtual)
"""
name = models.CharField(max_length=255,
help_text="Name of the collection")
prefix = models.CharField(max_length=255, default="default",
help_text="Folder to store works in")
administrators = models.ManyToManyField('auth.User', related_name="collections",
help_text="Administrators for this collection")
location = models.CharField(max_length=100,
help_text="Physical location (institution, town...)", blank=True)
storage = models.ForeignKey('byostorage.UserStorage', on_delete=models.CASCADE, null=True, blank=True,
help_text="User storage for documents")
notes = models.TextField(blank=True,
help_text="Publicly visible notes about collection and loans policy (markdown format)")
settings = models.JSONField(default=dict, blank=True,
help_text="Storage specific settings")
nonce = models.SmallIntegerField(default=1,
help_text="Increment this to reset the authentication links")
name = models.CharField(max_length=255, help_text="Name of the collection")
prefix = models.CharField(
max_length=255, default="default", help_text="Folder to store works in"
)
administrators = models.ManyToManyField(
"auth.User",
related_name="collections",
help_text="Administrators for this collection",
)
location = models.CharField(
max_length=100, help_text="Physical location (institution, town...)", blank=True
)
storage = models.ForeignKey(
"byostorage.UserStorage",
on_delete=models.CASCADE,
null=True,
blank=True,
help_text="User storage for documents",
)
notes = models.TextField(
blank=True,
help_text="Publicly visible notes about collection and loans policy (markdown format)",
)
settings = models.JSONField(
default=dict, blank=True, help_text="Storage specific settings"
)
nonce = models.SmallIntegerField(
default=1, help_text="Increment this to reset the authentication links"
)
def meta(self, name):
items = WorkMeta.objects.filter(work__collection=self.pk, name=name).values_list('value', flat=True).distinct()
items = (
WorkMeta.objects.filter(work__collection=self.pk, name=name)
.values_list("value", flat=True)
.distinct()
)
return items
@property
def tags(self):
return self.meta('tag')
return self.meta("tag")
@property
def genres(self):
return self.meta('genre')
return self.meta("genre")
def has_administrator(self, user):
if not user.is_authenticated:
return False
if user.is_superuser:
return True
return user.pk in self.administrators.values_list('pk', flat=True)
return user.pk in self.administrators.values_list("pk", flat=True)
def get_absolute_url(self):
return resolve_url('collection_work_list', self.pk)
return resolve_url("collection_work_list", self.pk)
def auth(self):
return sign_data(f'{self.pk}-{self.nonce}', 12)
return sign_data(f"{self.pk}-{self.nonce}", 12)
def __str__(self):
return self.name
class EnsembleAccess(models.Model):
"""
Can have different access levels to a collection
@ -143,12 +179,18 @@ class EnsembleAccess(models.Model):
ACCESS_APPROVED = 2
ACCESS_TYPES = (
(ACCESS_UNLIMITED, 'Unlimited'),
(ACCESS_APPROVED, 'Approval required'),
(ACCESS_UNLIMITED, "Unlimited"),
(ACCESS_APPROVED, "Approval required"),
)
ensemble = models.ForeignKey('interface.Ensemble', on_delete=models.CASCADE, related_name="allowed_collections")
collection = models.ForeignKey(Collection, on_delete=models.CASCADE, related_name="allowed_ensembles")
ensemble = models.ForeignKey(
"interface.Ensemble",
on_delete=models.CASCADE,
related_name="allowed_collections",
)
collection = models.ForeignKey(
Collection, on_delete=models.CASCADE, related_name="allowed_ensembles"
)
access_type = models.PositiveSmallIntegerField(choices=ACCESS_TYPES, default=2)
class Meta:
@ -159,6 +201,7 @@ class Work(models.Model):
"""
A musical work 'owned' by a collection from a licencing perspective.
"""
LICENCE_PUBLIC = 2
LICENCE_EXPIRED = 4
LICENCE_RECORDING = 5
@ -167,37 +210,73 @@ class Work(models.Model):
LICENCE_NONE = 10
LICENCE_TYPES = (
(LICENCE_PUBLIC, 'Public Domain'),
(LICENCE_EXPIRED, 'Copyright Expired'),
(LICENCE_RECORDING, 'Recording Licence'),
(LICENCE_PERFORMANCE, 'Performance Licence'),
(LICENCE_PERUSAL, 'Perusal Licence'),
(LICENCE_NONE, 'Internal use only'),
(LICENCE_PUBLIC, "Public Domain"),
(LICENCE_EXPIRED, "Copyright Expired"),
(LICENCE_RECORDING, "Recording Licence"),
(LICENCE_PERFORMANCE, "Performance Licence"),
(LICENCE_PERUSAL, "Perusal Licence"),
(LICENCE_NONE, "Internal use only"),
)
name = models.CharField(max_length=255, help_text="Original name of the work")
edition = models.CharField(max_length=255, blank=True,
help_text="Edition details to distinguish multiple versions")
parent = models.ForeignKey('Work', null=True, blank=True, on_delete=models.SET_NULL, related_name="related_works",
help_text="Arrangement of another work or part of an anthology")
composer = models.CharField(max_length=255, default='Anon',
help_text="Composer or compilation editor. Use <b>Surname, Initial</b> for easy searching")
edition = models.CharField(
max_length=255,
blank=True,
help_text="Edition details to distinguish multiple versions",
)
parent = models.ForeignKey(
"Work",
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="related_works",
help_text="Arrangement of another work or part of an anthology",
)
composer = models.CharField(
max_length=255,
default="Anon",
help_text="Composer or compilation editor. Use <b>Surname, Initial</b> for easy searching",
)
orchestration = models.ForeignKey(Orchestration, on_delete=models.SET_DEFAULT, default=1, help_text="Orchestration for the work")
original_parts = models.JSONField(default=dict, blank=True, help_text="Original printed parts (IMSLP format)")
orchestration = models.ForeignKey(
Orchestration,
on_delete=models.SET_DEFAULT,
default=1,
help_text="Orchestration for the work",
)
original_parts = models.JSONField(
default=dict, blank=True, help_text="Original printed parts (IMSLP format)"
)
# Collection details
collection = models.ForeignKey(Collection, on_delete=models.CASCADE, related_name="works")
code = models.CharField(max_length=100, blank=True, help_text="Collection specific code or number. Will be auto-generated if not supplied")
licence = models.PositiveSmallIntegerField(choices=LICENCE_TYPES, default=6, help_text="Copyright status")
max_projects = models.IntegerField(default=1, help_text="How many active projects can this work be attached to")
collection = models.ForeignKey(
Collection, on_delete=models.CASCADE, related_name="works"
)
code = models.CharField(
max_length=100,
blank=True,
help_text="Collection specific code or number. Will be auto-generated if not supplied",
)
licence = models.PositiveSmallIntegerField(
choices=LICENCE_TYPES, default=6, help_text="Copyright status"
)
max_projects = models.IntegerField(
default=1, help_text="How many active projects can this work be attached to"
)
# Extra info
running_time = models.DurationField(null=True, blank=True, help_text="Running time in mm:ss format")
running_time = models.DurationField(
null=True, blank=True, help_text="Running time in mm:ss format"
)
notes = models.TextField(blank=True)
# Allocation to projects
projects = models.ManyToManyField('interface.Project', through='ProjectItem', related_name="works", help_text="Current usage")
# Allocation to projects
projects = models.ManyToManyField(
"interface.Project",
through="ProjectItem",
related_name="works",
help_text="Current usage",
)
@property
def folder(self):
@ -205,19 +284,20 @@ class Work(models.Model):
def tagged_sections(self, *tags):
qs = self.docs.filter(sections__tag__in=tags)
qs = qs.annotate(Count('sections'), end=Min('sections__end'), start=Max('sections__start')) \
.filter(sections__count=len(tags))
qs = qs.annotate(
Count("sections"), end=Min("sections__end"), start=Max("sections__start")
).filter(sections__count=len(tags))
return qs
def list_sections(self, *tags):
return list(self.tagged_sections(*tags).values_list('upload', 'start', 'end'))
return list(self.tagged_sections(*tags).values_list("upload", "start", "end"))
@property
def digital_parts(self):
sections = [ (s.tag, s.name) for s in Section.objects.filter(doc__work=self.pk) ]
sections = [(s.tag, s.name) for s in Section.objects.filter(doc__work=self.pk)]
sections.sort(key=self.orchestration.sorter())
#return [ s[1] for s in sections ]
sections = list(dict(sections).items()) # primitive unique()
# return [ s[1] for s in sections ]
sections = list(dict(sections).items()) # primitive unique()
return sections
def pdfs(self):
@ -229,19 +309,21 @@ class Work(models.Model):
return []
parts = list(self.original_parts.items())
parts.sort(key=self.orchestration.sorter())
return [ (MusicTag.from_tag(x[0]), x[1]) for x in parts ]
return [(MusicTag.from_tag(x[0]), x[1]) for x in parts]
@property
def tags(self):
return self.meta_info.filter(name='tag').values_list('value', flat=True)
return self.meta_info.filter(name="tag").values_list("value", flat=True)
@property
def meta(self):
return self.meta_info.exclude(name='tag')
return self.meta_info.exclude(name="tag")
@property
def current_loans(self):
return self.project_items.filter(checkout__lte=now(), returned=None).select_related('project')
return self.project_items.filter(
checkout__lte=now(), returned=None
).select_related("project")
@cached_property
def loans(self):
@ -259,29 +341,31 @@ class Work(models.Model):
@property
def available(self):
if self.max_projects < 0:
return 'Unlimited'
return "Unlimited"
a = self.max_projects - self.loans
return '{0} of {1}'.format(max(a, 0), self.max_projects)
return "{0} of {1}".format(max(a, 0), self.max_projects)
@property
def identifier(self):
if self.code:
return self.code;
return self.code
composer = self.composer or "Anon"
composer = re.sub('[^\w]', '', composer)
composer = re.sub("[^\w]", "", composer)
words = self.name.split()
work = words[0][:3]
return f"{composer[:4]}-{work}-{self.pk:05d}".upper()
def assigned_instruments(self):
return Section.objects.filter(doc__work_id=self.pk).values_list('tag', flat=True)
return Section.objects.filter(doc__work_id=self.pk).values_list(
"tag", flat=True
)
def unassigned_instruments(self):
assigned = set(self.assigned_instruments())
return [ x for x in self.orchestration.as_list() if not x[0] in assigned ]
return [x for x in self.orchestration.as_list() if not x[0] in assigned]
def music_tags(self):
tags = dict(self.orchestration.as_list())
@ -290,31 +374,32 @@ class Work(models.Model):
return tags.items()
def __str__(self):
return f"{self.name} ({self.composer})"
class WorkMeta(models.Model):
class WorkMeta(models.Model):
META_CHOICES = (
('tag', 'Tag'),
('arr', 'Arranger'),
('lyrics', 'Lyracist'),
('genre', 'Genre'),
('style', 'Style'),
('orchestration', 'Orchestration'),
("tag", "Tag"),
("arr", "Arranger"),
("lyrics", "Lyracist"),
("genre", "Genre"),
("style", "Style"),
("orchestration", "Orchestration"),
)
work = models.ForeignKey(Work, on_delete=models.CASCADE, related_name='meta_info')
work = models.ForeignKey(Work, on_delete=models.CASCADE, related_name="meta_info")
name = models.SlugField(max_length=20, choices=META_CHOICES)
value = models.CharField(max_length=255)
def doc_upload_filename(doc, filename):
collection = doc.work.collection
storage = collection.storage
if not storage:
raise RuntimeError("Collection has no storage attached")
return f'{storage}:library/{collection.prefix}/{doc.work.folder}/{filename}'
return f"{storage}:library/{collection.prefix}/{doc.work.folder}/{filename}"
class Document(models.Model):
"""
@ -324,22 +409,22 @@ class Document(models.Model):
DOCTYPE_PDF = 1
DOCTYPE_AUDIO = 2
DOCTYPE_VIDEO = 3
DOCTYPE_MISC= 4
DOCTYPE_MISC = 4
DOCTYPES = (
(DOCTYPE_PDF, 'PDF'),
(DOCTYPE_AUDIO, 'Audio'),
(DOCTYPE_VIDEO, 'Video'),
(DOCTYPE_MISC, 'Misc'),
(DOCTYPE_PDF, "PDF"),
(DOCTYPE_AUDIO, "Audio"),
(DOCTYPE_VIDEO, "Video"),
(DOCTYPE_MISC, "Misc"),
)
DOCTYPE_MAP = {
'.pdf': DOCTYPE_PDF,
'.mp3': DOCTYPE_AUDIO,
'.mp4': DOCTYPE_VIDEO,
".pdf": DOCTYPE_PDF,
".mp3": DOCTYPE_AUDIO,
".mp4": DOCTYPE_VIDEO,
}
work = models.ForeignKey('Work', on_delete=models.CASCADE, related_name="docs")
work = models.ForeignKey("Work", on_delete=models.CASCADE, related_name="docs")
doctype = models.PositiveSmallIntegerField(choices=DOCTYPES, default=DOCTYPE_PDF)
upload = models.FileField(upload_to=doc_upload_filename, storage=library_storage)
created = models.DateTimeField(auto_now_add=True)
@ -355,6 +440,7 @@ class Document(models.Model):
def __str__(self):
return self.upload.name
class Section(models.Model):
"""
Section is a tagged portion of a Document
@ -365,20 +451,21 @@ class Section(models.Model):
PAGE_RIGHT = 2
PAGE_PREFERENCE = (
(PAGE_AUTO, 'auto'),
(PAGE_LEFT, 'left'),
(PAGE_RIGHT, 'right'),
(PAGE_AUTO, "auto"),
(PAGE_LEFT, "left"),
(PAGE_RIGHT, "right"),
)
doc = models.ForeignKey(Document, on_delete=models.CASCADE, related_name="sections")
tag = models.CharField(max_length=50, blank=True)
start = models.SmallIntegerField(null=True, blank=True)
end = models.SmallIntegerField(null=True, blank=True)
page = models.SmallIntegerField(default=PAGE_AUTO, choices=PAGE_PREFERENCE) # NOT CURRENTLY USED
page = models.SmallIntegerField(
default=PAGE_AUTO, choices=PAGE_PREFERENCE
) # NOT CURRENTLY USED
class Meta:
ordering = ['doc', 'start', 'pk']
ordering = ["doc", "start", "pk"]
@property
def music_tag(self):
@ -390,11 +477,11 @@ class Section(models.Model):
@property
def bulma_class(self):
return "success" if self.music_tag.is_general else 'info'
return "success" if self.music_tag.is_general else "info"
@property
def filename(self):
return slugify(f'{self.doc.work.name} - {self.name}').title() + '.pdf'
return slugify(f"{self.doc.work.name} - {self.name}").title() + ".pdf"
@property
def pagerange(self):
@ -406,4 +493,3 @@ class Section(models.Model):
def __str__(self):
return self.name

View File

@ -1,4 +1,3 @@
from collections import namedtuple
GENERAL = """
@ -158,20 +157,21 @@ zith Zither
MUSIC_TAGS = []
GENERAL_TAGS = set()
for i, abbreviations in enumerate((GENERAL, INSTRUMENTS)):
for line in abbreviations.split('\n'):
for line in abbreviations.split("\n"):
parts = line.strip().split(maxsplit=1)
if len(parts) < 2: continue
name, _, _ = parts[1].partition('(')
if len(parts) < 2:
continue
name, _, _ = parts[1].partition("(")
MUSIC_TAGS.append((parts[0], name))
if i == 0:
GENERAL_TAGS.add(parts[0])
MUSIC_NAME_BY_TAG = dict(MUSIC_TAGS)
MUSIC_TAG_BY_NAME = dict( ( (x[1].lower(), x[0]) for x in MUSIC_TAGS ) )
MUSIC_TAG_BY_NAME = dict(((x[1].lower(), x[0]) for x in MUSIC_TAGS))
class MusicTag(namedtuple('MusicTag', ('name', 'variant'), defaults=[None])):
class MusicTag(namedtuple("MusicTag", ("name", "variant"), defaults=[None])):
@classmethod
def from_tag(cls, tag):
"""
@ -186,13 +186,13 @@ class MusicTag(namedtuple('MusicTag', ('name', 'variant'), defaults=[None])):
>>> MusicTag.from_tag('pce-A2')
MusicTag(name='Piece', variant='A2')
"""
abbr, _, variant = tag.partition('-')
abbr, _, variant = tag.partition("-")
name = MUSIC_NAME_BY_TAG.get(abbr.lower(), abbr)
if variant:
return cls(name, variant)
return cls(name, None)
@property
def tag(self):
l = self.name.lower()
@ -206,7 +206,7 @@ class MusicTag(namedtuple('MusicTag', ('name', 'variant'), defaults=[None])):
>>> MusicTag('Violin', 2).is_general
False
"""
return self.tag in GENERAL_TAGS
return self.tag in GENERAL_TAGS
def abbreviate(self):
"""
@ -231,12 +231,15 @@ class MusicTag(namedtuple('MusicTag', ('name', 'variant'), defaults=[None])):
return f"{self.name} {self.variant}"
return self.name
import re
PATTERNS = [re.compile('([A-Za-z]+)[_\- ]*(\d+)'), re.compile('([A-Za-z]+)()')]
PATTERNS = [re.compile("([A-Za-z]+)[_\- ]*(\d+)"), re.compile("([A-Za-z]+)()")]
def auto_tag(filename):
'''
"""
>>> auto_tag('Ode to Joy - Violin 1.pdf')
MusicTag(name='Violin', variant=1)
>>> auto_tag('Ode to Joy_Cello.pdf')
@ -247,7 +250,7 @@ def auto_tag(filename):
MusicTag(name='Viola', variant=None)
>>> auto_tag('Ode to Joy - fl-2 (piccolo).pdf')
MusicTag(name='Flute', variant=2)
'''
"""
for pattern in PATTERNS:
for inst, ordinal in pattern.findall(filename):
@ -257,9 +260,9 @@ def auto_tag(filename):
return MusicTag(inst.title(), ordinal)
if inst in MUSIC_NAME_BY_TAG:
return MusicTag(MUSIC_NAME_BY_TAG[inst], ordinal)
if __name__ == "__main__":
import doctest
print(doctest.testmod())
print(doctest.testmod())

View File

@ -5,22 +5,23 @@ import string
SAFECHARS = string.ascii_letters + string.digits + " _-"
def extract_pages(source, bookmark, start=None, end=None, count=1):
return extract_and_concat([(source, bookmark, start, end, count)])
def extract_and_concat(items):
# create a temporary directory for our sections
d = tempfile.TemporaryDirectory(prefix="polyphonic_")
pdfmarks = os.path.join(d.name, 'pdfmarks.txt')
marks = open(pdfmarks, 'w')
pdfmarks = os.path.join(d.name, "pdfmarks.txt")
marks = open(pdfmarks, "w")
sections = []
current_page = 1
for i, (source, bookmark, start, end, count) in enumerate(items):
if count == 0:
continue
@ -28,23 +29,34 @@ def extract_and_concat(items):
sections.append(source)
else:
if not end:
end = start
dest = os.path.join(d.name, f'section_{i}.pdf')
dest = os.path.join(d.name, f"section_{i}.pdf")
cmd = ['gs', '-sDEVICE=pdfwrite', '-dBATCH', '-dNOPAUSE',
f'-dFirstPage={start}', f'-dLastPage={end}',
f'-sOutputFile={dest}',
source]
cmd = [
"gs",
"-sDEVICE=pdfwrite",
"-dBATCH",
"-dNOPAUSE",
f"-dFirstPage={start}",
f"-dLastPage={end}",
f"-sOutputFile={dest}",
source,
]
bookmark = "".join(filter(lambda c: c in SAFECHARS, bookmark))
marks.write(f'[/Title ({bookmark}) /Page {current_page} /OUT pdfmark\n')
marks.write(f"[/Title ({bookmark}) /Page {current_page} /OUT pdfmark\n")
p = subprocess.run(cmd, check=True, capture_output=True)
pages = len([ x for x in p.stdout.splitlines() if x.decode('utf8').startswith('Page ')])
pages = len(
[
x
for x in p.stdout.splitlines()
if x.decode("utf8").startswith("Page ")
]
)
for j in range(count):
sections.append(dest)
current_page += pages
@ -52,10 +64,9 @@ def extract_and_concat(items):
marks.close()
# concat the items
output = tempfile.NamedTemporaryFile(prefix="polyphonic_", suffix='.pdf')
output = tempfile.NamedTemporaryFile(prefix="polyphonic_", suffix=".pdf")
cmd = ['gs', '-sDEVICE=pdfwrite', '-q', '-dBATCH', '-dNOPAUSE',
'-sOutputFile=-']
cmd = ["gs", "-sDEVICE=pdfwrite", "-q", "-dBATCH", "-dNOPAUSE", "-sOutputFile=-"]
cmd.extend(sections)
cmd.append(pdfmarks)

View File

@ -1,4 +1,4 @@
from interface.tests import AccessTestCase
from interface.tests import AccessTestCase
from byostorage.user import UserStorage
from . import models
@ -7,87 +7,101 @@ from .views.api import WorkSerializer
import tempfile
import json
class LibraryTestCase(AccessTestCase):
class LibraryTestCase(AccessTestCase):
USERS = (
{'username': 'admin', 'password': 'secret', 'is_superuser': True, 'is_staff': True},
{'username': 'homer', 'password': 'maggie'},
{
"username": "admin",
"password": "secret",
"is_superuser": True,
"is_staff": True,
},
{"username": "homer", "password": "maggie"},
)
ENSEMBLES = (
{'name': 'The Be Sharps', 'slug': 'be-sharps', 'admins': ['homer']},
{'name': 'Lisa & the Bleeding Gums', 'slug': 'bleeding-gums'},
{'name': 'Party Posse'},
{"name": "The Be Sharps", "slug": "be-sharps", "admins": ["homer"]},
{"name": "Lisa & the Bleeding Gums", "slug": "bleeding-gums"},
{"name": "Party Posse"},
)
PROJECTS = (
{'name': 'Baker St', 'ensemble': 'bleeding-gums', 'when': -12},
{'name': 'Navy Recruitment Day', 'ensemble': 'party-posse', 'when': 6},
{'name': 'Barbershop Contest', 'ensemble': 'be-sharps', 'when': 28},
{'name': 'Open Mic Night', 'ensemble': 'bleeding-gums', 'when': 1 },
{"name": "Baker St", "ensemble": "bleeding-gums", "when": -12},
{"name": "Navy Recruitment Day", "ensemble": "party-posse", "when": 6},
{"name": "Barbershop Contest", "ensemble": "be-sharps", "when": 28},
{"name": "Open Mic Night", "ensemble": "bleeding-gums", "when": 1},
)
COLLECTIONS = (
{'name': 'Springfield Elementary Library', 'prefix': 'sel'},
{'name': 'Neds Library', 'prefix': 'ned', 'admins': ['homer']},
{"name": "Springfield Elementary Library", "prefix": "sel"},
{"name": "Neds Library", "prefix": "ned", "admins": ["homer"]},
)
WORKS = (
{'name': 'Baby on Board', 'collection': 'ned', 'docs': [{'upload': 'local:baby_on_board.pdf'}]},
{'name': 'Star Spangled Banner', 'collection': 'sel'},
{
"name": "Baby on Board",
"collection": "ned",
"docs": [{"upload": "local:baby_on_board.pdf"}],
},
{"name": "Star Spangled Banner", "collection": "sel"},
)
PROTECTED_URLS = (
'/collections/1',
'/collections/1/add',
'/collections/2/works/1',
'/collections/2/works/1/edit',
'/collections/2/works/1/partset',
'/collections/2/works/1/add_to_project',
'/collections/2/works/1/upload',
'/collections/2/docs/1/annotate',
"/collections/1",
"/collections/1/add",
"/collections/2/works/1",
"/collections/2/works/1/edit",
"/collections/2/works/1/partset",
"/collections/2/works/1/add_to_project",
"/collections/2/works/1/upload",
"/collections/2/docs/1/annotate",
# Need to add storage before we can test these
'/api/collections/2',
'/api/collections/2/works/1',
'/admin/library/collection/',
'/admin/library/document/',
'/admin/library/ensembleaccess/',
'/admin/library/orchestration/',
'/admin/library/projectitem/',
'/admin/library/work/',
"/api/collections/2",
"/api/collections/2/works/1",
"/admin/library/collection/",
"/admin/library/document/",
"/admin/library/ensembleaccess/",
"/admin/library/orchestration/",
"/admin/library/projectitem/",
"/admin/library/work/",
)
@classmethod
def setUpTestData(cls):
super().setUpTestData()
cls.temp_dir = tempfile.TemporaryDirectory()
cls.storage = UserStorage.objects.create(name='local', storage='django.core.files.storage.FileSystemStorage',
settings_data=json.dumps({'location': cls.temp_dir.name, 'base_url': 'file://' + cls.temp_dir.name}))
cls.storage = UserStorage.objects.create(
name="local",
storage="django.core.files.storage.FileSystemStorage",
settings_data=json.dumps(
{
"location": cls.temp_dir.name,
"base_url": "file://" + cls.temp_dir.name,
}
),
)
cls.collections = {}
for details in cls.COLLECTIONS:
admins = details.pop('admins', [])
admins = details.pop("admins", [])
obj = models.Collection.objects.create(storage=cls.storage, **details)
for admin in admins:
obj.administrators.add(cls.users[admin])
cls.collections[details['prefix']] = obj
cls.collections[details["prefix"]] = obj
cls.works = {}
for details in cls.WORKS:
collection = cls.collections[details.pop('collection')]
#details.setdefault('docs', [])
#details.setdefault('meta_info', [])
#s = WorkSerializer(data=details)
#assert s.is_valid(), s.errors
#s.save(collection_id=collection.pk)
docs = details.pop('docs', [])
collection = cls.collections[details.pop("collection")]
# details.setdefault('docs', [])
# details.setdefault('meta_info', [])
# s = WorkSerializer(data=details)
# assert s.is_valid(), s.errors
# s.save(collection_id=collection.pk)
docs = details.pop("docs", [])
obj = models.Work.objects.create(collection=collection, **details)
for doc in docs:
obj.docs.create(**doc)
cls.works[details['name']] = obj
cls.works[details["name"]] = obj
def setUp(self):
pass
@ -95,82 +109,105 @@ class LibraryTestCase(AccessTestCase):
@classmethod
def tearDownClass(cls):
cls.temp_dir.cleanup()
def test_integration(self):
pass
def test_superuser_access(self):
self.login('admin', 'secret')
self.assertAccess({
'/collections': True,
'/collections/1': True,
'/collections/2/works/1': True,
})
self.login("admin", "secret")
self.assertAccess(
{
"/collections": True,
"/collections/1": True,
"/collections/2/works/1": True,
}
)
def test_administrator_access(self):
self.login('homer', 'maggie')
self.assertAccess({
'/collections': True,
'/collections/1': False,
'/collections/2': True,
'/collections/2/works/1': True,
})
self.login("homer", "maggie")
self.assertAccess(
{
"/collections": True,
"/collections/1": False,
"/collections/2": True,
"/collections/2/works/1": True,
}
)
def test_link_access(self):
self.assertAccess({
'/collections': True,
'/collections/1': False,
'/collections/2': False,
'/collections/2/works/1': False,
})
self.assertAccess(
{
"/collections": True,
"/collections/1": False,
"/collections/2": False,
"/collections/2/works/1": False,
}
)
self.authorize(models.Collection, pk=2)
self.assertAccess({
'/collections': True,
'/collections/1': False,
'/collections/2': True,
'/collections/2/works/1': True,
})
self.assertAccess(
{
"/collections": True,
"/collections/1": False,
"/collections/2": True,
"/collections/2/works/1": True,
}
)
def test_anon_access(self):
self.assertAccess({
'/collections': True,
'/collections/1': False,
'/collections/2': False,
'/collections/2/works/1': False,
})
self.assertAccess(
{
"/collections": True,
"/collections/1": False,
"/collections/2": False,
"/collections/2/works/1": False,
}
)
def test_export_and_import(self):
self.login('admin', 'secret')
data = self.client.get('/api/collections/1/works/2', HTTP_ACCEPT="application/json").json()
response = self.client.post('/api/collections/2/import', data, "application/json")
self.assertEqual(response.status_code, 201)
self.login("admin", "secret")
data = self.client.get(
"/api/collections/1/works/2", HTTP_ACCEPT="application/json"
).json()
response = self.client.post(
"/api/collections/2/import", data, "application/json"
)
self.assertEqual(response.status_code, 201)
def test_movement_from_large_work(self):
'''
"""
Will be common to store a work which has several movements, but the project is only going to play one.
This also should give us the ability to store an anthology as one Work have Project reference 'no:23'
'''
"""
work = self.collections['sel'].works.create(name="Some Quartet", composer="Beethoven")
for g in ('vl-1', 'vl-2', 'vla', 'vc'):
doc = work.docs.create(upload=f'sel/beethoven/some_quartet/some_quartet_{g}.pdf')
doc.sections.create(tag='mvmt-1', start=1, end=3)
doc.sections.create(tag='mvmt-2', start=4, end=8)
doc.sections.create(tag='mvmt-3', start=9, end=12)
work = self.collections["sel"].works.create(
name="Some Quartet", composer="Beethoven"
)
for g in ("vl-1", "vl-2", "vla", "vc"):
doc = work.docs.create(
upload=f"sel/beethoven/some_quartet/some_quartet_{g}.pdf"
)
doc.sections.create(tag="mvmt-1", start=1, end=3)
doc.sections.create(tag="mvmt-2", start=4, end=8)
doc.sections.create(tag="mvmt-3", start=9, end=12)
doc.sections.create(tag=g)
# no tags - get nothing (should it be everything?)
self.assertEqual(work.list_sections(), [])
# single tag - should get just that range
self.assertEqual(work.list_sections('vl-1'), [('sel/beethoven/some_quartet/some_quartet_vl-1.pdf', None, None)])
self.assertEqual(
work.list_sections("vl-1"),
[("sel/beethoven/some_quartet/some_quartet_vl-1.pdf", None, None)],
)
# single tag - returns all documents with that range
result = work.list_sections('mvmt-2')
result = work.list_sections("mvmt-2")
self.assertEqual(len(result), 4)
# multiple tags - returns the overlapping portion of all documents that have all tags
self.assertEqual(work.list_sections('vl-1', 'mvmt-2'), [('sel/beethoven/some_quartet/some_quartet_vl-1.pdf', 4, 8)])
self.assertEqual(work.list_sections('vl-1', 'vl-2'), [])
self.assertEqual(
work.list_sections("vl-1", "mvmt-2"),
[("sel/beethoven/some_quartet/some_quartet_vl-1.pdf", 4, 8)],
)
self.assertEqual(work.list_sections("vl-1", "vl-2"), [])

View File

@ -6,42 +6,120 @@ from . import views
from library.views import api
#router = routers.DefaultRouter()
#router.register(r'collection', external.CollectionViewSet, basename="collection")
#router.register(r'work', external.WorkViewSet, basename="work")
# router = routers.DefaultRouter()
# router.register(r'collection', external.CollectionViewSet, basename="collection")
# router.register(r'work', external.WorkViewSet, basename="work")
urlpatterns = [
path('projects/<int:project>/items', views.ProjectItemListView.as_view(), name="item_list"),
path('projects/<int:project>/items/manage', views.ProjectItemManageView.as_view(), name="item_list_manage"),
path('projects/<int:project>/items/append', views.ProjectItemAddView.as_view(), name="item_list_append"),
path('library', views.LibraryWorkListView.as_view(), name="work_list"),
path('collections', views.CollectionListView.as_view(), name="collection_list"),
path('collections/<int:collection>', views.CollectionWorkListView.as_view(), name="collection_work_list"),
path('collections/<int:collection>/add', views.WorkAddView.as_view(), name="work_add"),
path('collections/<int:collection>/works/<int:pk>', views.WorkDetailView.as_view(), name="work_detail"),
path('collections/<int:collection>/works/<int:pk>/edit', views.WorkUpdateView.as_view(), name="work_edit"),
path('collections/<int:collection>/works/<int:pk>/partset', views.WorkPartSetView.as_view(), name="work_partset"),
path('collections/<int:collection>/works/<int:pk>/parts', views.WorkPartsView.as_view(), name="work_parts"),
path('collections/<int:collection>/works/<int:pk>/add_to_project', views.WorkAddToProject.as_view(), name="work_add_to_project"),
path('collections/<int:collection>/works/<int:pk>/upload', views.WorkAddDocumentView.as_view(), name="document_add"),
path('collections/<int:collection>/works/<int:pk>/download', views.WorkDownloadView.as_view(), name="work_download"),
path('collections/<int:collection>/docs/<int:pk>/delete', views.DocumentDeleteView.as_view(), name="document_delete"),
path('collections/<int:collection>/docs/<int:pk>/download', views.DocumentDownloadView.as_view(), name="document_download"),
path('collections/<int:collection>/docs/<int:pk>/annotate', views.DocumentAnnotateView.as_view(), name="document_annotate"),
path('collections/<int:collection>/download/<int:section>/<str:filename>', views.PartDownloadView.as_view(), name="part_download"),
path('collections/<int:collection>/browse', views.StorageBrowserView.as_view(), name="storage_browser"),
path('collections/<int:collection>/browse/<path:folder>', views.StorageBrowserView.as_view(), name="storage_browser_folder"),
#path('api/', include(router.urls))
path('api/collections/<int:pk>', api.CollectionExportView.as_view(), name="collection_export"),
path('api/collections/<int:collection>/works/<int:pk>', api.WorkExportView.as_view(), name="work_export"),
path('api/collections/<int:collection>/import', api.WorkImportView.as_view(), name="work_import"),
path('api/collections/<int:collection>/bulk_import', api.CollectionImportView.as_view(), name="collection_import"),
path(
"projects/<int:project>/items",
views.ProjectItemListView.as_view(),
name="item_list",
),
path(
"projects/<int:project>/items/manage",
views.ProjectItemManageView.as_view(),
name="item_list_manage",
),
path(
"projects/<int:project>/items/append",
views.ProjectItemAddView.as_view(),
name="item_list_append",
),
path("library", views.LibraryWorkListView.as_view(), name="work_list"),
path("collections", views.CollectionListView.as_view(), name="collection_list"),
path(
"collections/<int:collection>",
views.CollectionWorkListView.as_view(),
name="collection_work_list",
),
path(
"collections/<int:collection>/add", views.WorkAddView.as_view(), name="work_add"
),
path(
"collections/<int:collection>/works/<int:pk>",
views.WorkDetailView.as_view(),
name="work_detail",
),
path(
"collections/<int:collection>/works/<int:pk>/edit",
views.WorkUpdateView.as_view(),
name="work_edit",
),
path(
"collections/<int:collection>/works/<int:pk>/partset",
views.WorkPartSetView.as_view(),
name="work_partset",
),
path(
"collections/<int:collection>/works/<int:pk>/parts",
views.WorkPartsView.as_view(),
name="work_parts",
),
path(
"collections/<int:collection>/works/<int:pk>/add_to_project",
views.WorkAddToProject.as_view(),
name="work_add_to_project",
),
path(
"collections/<int:collection>/works/<int:pk>/upload",
views.WorkAddDocumentView.as_view(),
name="document_add",
),
path(
"collections/<int:collection>/works/<int:pk>/download",
views.WorkDownloadView.as_view(),
name="work_download",
),
path(
"collections/<int:collection>/docs/<int:pk>/delete",
views.DocumentDeleteView.as_view(),
name="document_delete",
),
path(
"collections/<int:collection>/docs/<int:pk>/download",
views.DocumentDownloadView.as_view(),
name="document_download",
),
path(
"collections/<int:collection>/docs/<int:pk>/annotate",
views.DocumentAnnotateView.as_view(),
name="document_annotate",
),
path(
"collections/<int:collection>/download/<int:section>/<str:filename>",
views.PartDownloadView.as_view(),
name="part_download",
),
path(
"collections/<int:collection>/browse",
views.StorageBrowserView.as_view(),
name="storage_browser",
),
path(
"collections/<int:collection>/browse/<path:folder>",
views.StorageBrowserView.as_view(),
name="storage_browser_folder",
),
# path('api/', include(router.urls))
path(
"api/collections/<int:pk>",
api.CollectionExportView.as_view(),
name="collection_export",
),
path(
"api/collections/<int:collection>/works/<int:pk>",
api.WorkExportView.as_view(),
name="work_export",
),
path(
"api/collections/<int:collection>/import",
api.WorkImportView.as_view(),
name="work_import",
),
path(
"api/collections/<int:collection>/bulk_import",
api.CollectionImportView.as_view(),
name="collection_import",
),
]

View File

@ -1,6 +1,7 @@
"""
Views relating to importing and exporting collection items
"""
"""
from interface.views import EnsembleMixin
from library.views import WorkMixin
@ -49,22 +50,24 @@ import os.path
from django.db import transaction
from django.core.files.uploadedfile import TemporaryUploadedFile
class WorkMetaSerializer(serializers.ModelSerializer):
class Meta:
model = WorkMeta
exclude = ['id', 'work']
exclude = ["id", "work"]
def to_representation(self, instance):
return f"{instance.name}:{instance.value}"
def to_internal_value(self, data):
name, _, value = data.partition(':')
return super().to_internal_value({'name': name, 'value': value})
name, _, value = data.partition(":")
return super().to_internal_value({"name": name, "value": value})
class SectionSerializer(serializers.ModelSerializer):
class Meta:
model = Section
exclude = ['id', 'doc']
exclude = ["id", "doc"]
def to_representation(self, instance):
start = instance.start or 0
@ -79,14 +82,14 @@ class SectionSerializer(serializers.ModelSerializer):
start = None
if end < 1:
end = None
return super().to_internal_value({'tag': tag, 'start': start, 'end': end})
return super().to_internal_value({"tag": tag, "start": start, "end": end})
class DocumentSerializer(serializers.ModelSerializer):
upload = serializers.URLField()
sections = SectionSerializer(many=True)
#def to_internal_value(self, data):
# def to_internal_value(self, data):
# r = requests.get(data['upload'], stream=True)
# with tempfile.NamedTemporaryFile('wb') as f:
# shutil.copyfileobj(r.raw, f)
@ -96,7 +99,7 @@ class DocumentSerializer(serializers.ModelSerializer):
def to_representation(self, instance):
data = super().to_representation(instance)
data['upload'] = instance.upload.url
data["upload"] = instance.upload.url
return data
def create(self, validated_data):
@ -115,35 +118,40 @@ class DocumentSerializer(serializers.ModelSerializer):
model = Document
exclude = ["id", "work", "version", "created"]
# Serializers define the API representation.
class WorkSerializer(serializers.ModelSerializer):
docs = DocumentSerializer(many=True)
meta_info = WorkMetaSerializer(many=True)
class Meta:
model = Work
exclude = ['id', 'collection', 'projects', 'parent']
exclude = ["id", "collection", "projects", "parent"]
def create(self, validated):
with transaction.atomic():
docs = validated.pop('docs', [])
meta = validated.pop('meta_info', [])
docs = validated.pop("docs", [])
meta = validated.pop("meta_info", [])
work = Work.objects.create(**validated)
for d in docs:
sections = d.pop('sections', [])
sections = d.pop("sections", [])
url = urllib.parse.urlparse(d['upload'])
url = urllib.parse.urlparse(d["upload"])
filename = os.path.basename(url.path)
r = requests.get(d['upload'], stream=True)
r = requests.get(d["upload"], stream=True)
if r.status_code != 200:
raise APIException("Failed to download file")
f = TemporaryUploadedFile(filename, r.headers['content-type'], r.headers.get('content-length'), r.encoding)
f = TemporaryUploadedFile(
filename,
r.headers["content-type"],
r.headers.get("content-length"),
r.encoding,
)
shutil.copyfileobj(r.raw, f.file)
r.close()
d['upload'] = f
d["upload"] = f
doc = Document.objects.create(work_id=work.pk, **d)
for s in sections:
@ -154,22 +162,24 @@ class WorkSerializer(serializers.ModelSerializer):
return work
class CollectionSerializer(serializers.Serializer):
works = WorkSerializer(many=True)
def create(self, validated):
s = WorkSerializer()
print(validated)
collection = validated['collection_id']
collection = validated["collection_id"]
with transaction.atomic():
for work in validated['works']:
work['collection_id'] = collection
for work in validated["works"]:
work["collection_id"] = collection
s.create(work)
return Collection.objects.get(pk=collection)
from rest_framework import generics
class CollectionExportView(AuthorizedResourceMixin, generics.RetrieveAPIView):
serializer_class = CollectionSerializer
@ -178,23 +188,26 @@ class CollectionExportView(AuthorizedResourceMixin, generics.RetrieveAPIView):
return Collection.objects.all()
return Collection.objects.filter(administrators=self.request.user)
class WorkExportView(AuthorizedResourceMixin, generics.RetrieveAPIView):
serializer_class = WorkSerializer
def get_queryset(self):
works = Work.objects.filter(collection=self.kwargs['collection'])
works = Work.objects.filter(collection=self.kwargs["collection"])
if self.request.user.is_superuser:
return works
return works
return works.filter(collection__administrators=self.request.user)
class WorkImportView(AuthorizedResourceMixin, generics.CreateAPIView):
serializer_class = WorkSerializer
def perform_create(self, serializer):
serializer.save(collection_id=self.kwargs['collection'])
serializer.save(collection_id=self.kwargs["collection"])
class CollectionImportView(AuthorizedResourceMixin, generics.CreateAPIView):
serializer_class = CollectionSerializer
def perform_create(self, serializer):
serializer.save(collection_id=self.kwargs['pk'])
serializer.save(collection_id=self.kwargs["pk"])

View File

@ -25,6 +25,7 @@ packages = [{include = "*", from="app"}]
[tool.poetry.group.dev.dependencies]
django-debug-toolbar = "5.2"
ruff = "^0.15.12"
[tool.poetry.scripts]
manage = "manage:main"