151 lines
4.1 KiB
Python
151 lines
4.1 KiB
Python
import subprocess
|
|
import pathlib
|
|
import sys
|
|
|
|
from pyn.config import Config
|
|
from pyn.common import logger
|
|
|
|
from datetime import datetime
|
|
from shutil import copyfileobj
|
|
from collections import namedtuple
|
|
from typing import Any, Generator
|
|
|
|
Entry = namedtuple("Entry", ("notebook", "filename"))
|
|
SearchResult = namedtuple("SearchResult", ("filename", "line_number", "snippet"))
|
|
|
|
NOTE_HEADING = """---
|
|
date: {date}
|
|
notebook: {notebook}
|
|
---
|
|
"""
|
|
|
|
TEMPLATES = {
|
|
"heading": NOTE_HEADING,
|
|
"note": "\n# {title}\n\n{body}\n---\n",
|
|
"todo": "* [ ] {item}\n",
|
|
}
|
|
|
|
|
|
def slugify(s):
|
|
return s.lower().replace(" ", "-")
|
|
|
|
|
|
def get_note_filename(
|
|
config: Config, title: str, notebook: str | None = None, create: bool = True
|
|
) -> pathlib.Path:
|
|
now = datetime.now()
|
|
|
|
notebook = slugify(notebook or config.default_notebook)
|
|
|
|
notebook_path = config.directory / notebook
|
|
note_path = (
|
|
notebook_path / f"{now.year}-{now.month:02d}-{now.day:02d}-{slugify(title)}.md"
|
|
)
|
|
|
|
if not note_path.is_file() and create:
|
|
notebook_path.mkdir(parents=True, exist_ok=True)
|
|
note_path.touch()
|
|
append_template(
|
|
note_path,
|
|
"heading",
|
|
date=now.isoformat(timespec="seconds"),
|
|
notebook=notebook,
|
|
)
|
|
|
|
return note_path
|
|
|
|
|
|
def append_template(filename: pathlib.Path, template: str, **context: Any) -> None:
|
|
rendered = TEMPLATES[template].format(**context)
|
|
with open(filename, "a") as f:
|
|
f.write(rendered)
|
|
|
|
|
|
def edit_file(config: Config, filename: pathlib.Path) -> None:
|
|
cmd = config.editor.copy()
|
|
cmd.append(str(filename))
|
|
subprocess.check_call(cmd)
|
|
|
|
|
|
def get_file_list(
|
|
config: Config, notebook: str | None = None, show_hidden: bool = False
|
|
) -> Generator[pathlib.Path]:
|
|
root = config.directory / notebook if notebook else config.directory
|
|
|
|
for d, dirs, files in root.walk():
|
|
if not show_hidden:
|
|
dirs[:] = [d for d in dirs if d[0] != "."]
|
|
|
|
for filename in files:
|
|
yield d / filename
|
|
|
|
|
|
def search_filenames(
|
|
config: Config, title: str, notebook: str | None = None
|
|
) -> pathlib.Path:
|
|
root = config.directory / notebook if notebook else config.directory
|
|
|
|
cmd = ["fzf", "-1", "-i"]
|
|
if title:
|
|
cmd.extend(["-q", title])
|
|
try:
|
|
output = subprocess.check_output(cmd, cwd=root)
|
|
file_path = root / output.decode("utf-8").strip()
|
|
return file_path
|
|
except subprocess.CalledProcessError as e:
|
|
if e.returncode == 130:
|
|
raise RuntimeError("User cancelled")
|
|
raise
|
|
|
|
|
|
def file_entry(config: Config, filepath: pathlib.Path) -> Entry:
|
|
relative = filepath.relative_to(config.directory)
|
|
return Entry(str(relative.parent), relative.name)
|
|
|
|
|
|
def cat_files(*filenames: pathlib.Path) -> None:
|
|
for filename in filenames:
|
|
with open(filename, "r") as f:
|
|
copyfileobj(f, sys.stdout)
|
|
|
|
|
|
def search_contents(
|
|
config: Config,
|
|
query: str,
|
|
notebook: str | None = None,
|
|
file_matcher: str = "*.md",
|
|
full_line: bool = False,
|
|
) -> Generator[SearchResult]:
|
|
root = config.directory / notebook if notebook else config.directory
|
|
|
|
cmd = ["grep", "-rFn", query, str(root)]
|
|
if file_matcher:
|
|
cmd.append(f"--include={file_matcher}")
|
|
if full_line:
|
|
cmd.append("-x")
|
|
|
|
logger.debug("Executing: %r", cmd)
|
|
try:
|
|
output = subprocess.check_output(cmd)
|
|
for line in output.decode("utf-8").strip().split("\n"):
|
|
filename, lineno, snippet = line.split(":", 2)
|
|
yield SearchResult(pathlib.Path(filename), int(lineno), snippet)
|
|
except subprocess.CalledProcessError as e:
|
|
if e.returncode == 1:
|
|
return
|
|
raise
|
|
|
|
|
|
def run_git(config: Config, cmd: list[str], quiet: bool = False) -> str:
|
|
if not (config.directory / ".git").is_dir():
|
|
if quiet:
|
|
return ""
|
|
raise RuntimeError("Notes not currently tracked")
|
|
|
|
cmd = ["git", "-C", str(config.directory)] + cmd
|
|
try:
|
|
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
|
return output.decode("utf-8").strip()
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(str(e))
|