Source code for egaia.egaia_docx

# -*- coding: utf-8 -*-

import os
import gettext
import json
import codecs
import fnmatch
import unicodecsv as csv
import pkg_resources
import datetime

from docx import Document
from docx.shared import Inches
from docx.enum.style import WD_STYLE_TYPE

import egaia_config
import egaia_list
import egaia_parsefn
import egaia_meta
import egaia_root

# FIXME: For efficiency, just load everything into memory first?
language = egaia_config.getConfig('archive', 'language')
term_title = egaia_config.getConfig('terms', 'DCTERMS.title')
term_identifier = egaia_config.getConfig('terms', 'DCTERMS.identifier')
term_original_filename = egaia_config.getConfig('terms', 'original_filename')
term_type = egaia_config.getConfig('terms', 'DCTERMS.type')
term_toc = egaia_config.getConfig('terms', 'DCTERMS.tableOfContents')
archive_name = egaia_config.getConfig('archive', 'archive_name').decode('utf-8')

template = pkg_resources.resource_filename('egaia', 'static/template.docx')

[docs]def getCoreFields(filtered=True): """Return an ordered list of core metadata field labels.""" # the default metadata fields, by key terms_list = egaia_config.getConfig(section='archive', key='core_metadata') fields_list = list() for term in terms_list.split(','): # don't include tableOfContents in the defaults, since we want # it to have tabular data but wish to avoid constructing an empty # table if generating a new, blank document if filtered and 'tableOfContents' in term: continue label = egaia_config.getConfig('terms', term.strip()) if label: fields_list.append(label) # create an unsorted dict to look up normalized => localized terms #fields_r = dict((value, key) for key, value in fields.iteritems()) return fields_list
[docs]def parseDocx(docx_file): """Process a docx file and return a dict of metadata, or None.""" if not os.path.exists(docx_file): return None try: document = Document(docx_file) except: print "Could not open %s" % docx_file return None data = [] key = None val = list() for para in document.paragraphs: if 'Title' in para.style.name: data.append((term_title, [para.text])) elif 'Heading' in para.style.name: # append prior data to the list if key and val: data.append((key, val)) key = para.text # initialize new tuple val = list() else: val.append(para.text) # process last paragraph if key and val: data.append((key, val)) # now process the table of contents, if present # protect against data loss with multiple tables tables = document.tables for table in tables: t = list() for row in table.rows: r = list() for cell in row.cells: # this gives us a string with newline-delimited text, like a csv cell r.append(cell.text) t.append(r) data.append((term_toc, t)) return dict(data)
[docs]def writeDocx(filename, append=None, write=None, out_file=None, force=False): """Write a dict to docx.""" print "Processing %s..." % filename if not out_file: out_file = filename data = dict() d = parseDocx(filename) if d: data.update(d) if write: new_data = json.loads(write) if not force: # overwrite data as a dry-run by default. print "Dry-run. Use the --force flag to write data." for x in new_data: if not x in data: continue if sorted(data[x]) != sorted(new_data[x]): print "OLD: %s" % data[x] print "NEW: %s" % new_data[x] print return data.update(new_data) if append: json_data = json.loads(append) for k, v in json_data.iteritems(): if not isinstance(v, list): v = [v] # filter out empty values v = filter(None, v) if not data.get(k): data[k] = list() elif not isinstance(data[k], list): data[k] = [data[k]] s = data[k] + v data[k] = [x for i, x in enumerate(s) if x not in s[:i]] document = Document(template) # clear the body. The template document should actually contain text # so that the styles are not identified as latent. document._body.clear_content() document.core_properties.modified = datetime.datetime.utcnow() # LibreOffice uses "Heading3", not "Heading 3" h3 = document.styles['Heading3'] thumb = None title = '' uuid = data.get(term_identifier, '') if isinstance(uuid, list): uuid = uuid[0] if data: title = data.get(term_title, ['']) if isinstance(title, list): doc_title = title[0] else: doc_title = title # FIXME: unicode errors in using the actual title! # docx/oxml/coreprops.py, line 299: value = str(value) document.core_properties.title = uuid document.add_heading(doc_title, 0) if not thumb: thumb = egaia_list.listFiles(uuid=uuid, filter_type='df-med') if thumb: # NB thumb is a list dimensions = egaia_meta.getDimensions(thumb[0]) if dimensions: dimensions = dimensions.split('x') try: if dimensions[0] > dimensions[1]: # landscape mode document.add_picture(thumb[0], width=Inches(6)) else: # portrait mode document.add_picture(thumb[0], height=Inches(4)) except: print "Error adding image %s to the document" % thumb[0] fields_list = getCoreFields() for field in fields_list: if term_title in field: # we already have this as the document title continue content = '' if data: content = data.get(field, '') document.add_paragraph(field, style=h3) if isinstance(content, list): # filter out the empty elements content = filter(None, content) for i in content: document.add_paragraph(i) else: document.add_paragraph(content) # process non-default metadata fields if data: for k, v in data.iteritems(): if k in fields_list or term_title in k: #if term_title in k: continue if isinstance(v, list): # filter out the empty elements v = filter(None, v) if len(v) < 1: continue document.add_paragraph(k, style=h3) # if True: we have a list of lists, so treat this as a tabular array if all(isinstance(i, list) for i in v): # add a table c = len(v[0]) # number of columns table = document.add_table(0, c, style=None) for i in v: cells = table.add_row().cells for count, elem in enumerate(i): cells[count].text = elem table.autofit = True else: for i in v: document.add_paragraph(i) else: document.add_paragraph(k, style=h3) document.add_paragraph(v) print "Writing to %s..." % out_file document.save(out_file)
[docs]def docx2csv(docx_files, csv_file): """Convert docx to csv""" metadata = list() for fn in docx_files: row_data = parseDocx(fn) # Convert lists to newline-delineated values in csv cells for k, v in row_data.iteritems(): if isinstance(v, list): row_data[k] = '\n'.join(v) metadata.append(row_data) with open(csv_file, 'wb') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=getCoreFields(), extrasaction='ignore') writer.writeheader() writer.writerows(metadata) return
[docs]def docx2json(docx_files, json_file=None): """Convert docx to json""" metadata = list() for fn in docx_files: row_data = parseDocx(fn) metadata.append(row_data) if not json_file: return json.dumps(metadata) with codecs.open(json_file, mode='w', encoding='utf-8') as json_doc: json_doc.write(json.dumps(metadata, sort_keys=True, indent=4, ensure_ascii=False)) return
[docs]def loadCsv(csv_path): """Load metadata csv using the DictReader. Return a list of dicts.""" # Copy to a list since the csv.reader requires an open file metadata = list() try: with open(csv_path, 'rb') as csvfile: csvreader = csv.DictReader(csvfile) for row in csvreader: metadata.append(row) except: #print "error reading %s" % csv_path return None return metadata
[docs]def csv2docx(csv_file, force=True): """Convert csv to docx files""" metadata = loadCsv(csv_file) if not metadata: print "Error reading input metadata!" return for row in metadata: row2 = dict() for k, v in row.iteritems(): if '\n' in v: v = v.splitlines() if not isinstance(v, list): v = [v] row2[k] = v # FIXME: create identifier if missing uuid = row.get(term_identifier) itemtype = row.get(term_type, '') if 'collection' in itemtype.lower(): # this should be the collection metadata docx_filename = os.path.join(egaia_root.get_root(), 'metadata-%s.docx' % language) else: # locate the original filename and place the docx file next to it originals = egaia_list.listFiles(uuid=uuid,filter_type='originals') if not originals: print "Could not find item in the current collection: %s" % uuid continue orig = originals[0] basename = egaia_parsefn.getBasename(orig) if not uuid or not basename: continue docx_fn = '.'.join([basename, 'metadata-%s' % language, uuid, 'docx']) docx_filename = os.path.join(os.path.dirname(orig), docx_fn) md = json.dumps(row2, encoding='utf-8') writeDocx(docx_filename, write=md, force=force) return
[docs]def update(update_existing=False): """Add new metadata files to the collection.""" # list the files tagged_files = egaia_list.listFiles(filter_type='originals') + egaia_list.listDirs(filter_type='originals') metadata_files = egaia_list.listFiles(filter_type='metadata') new = True for filename in tagged_files: uuid = egaia_parsefn.getUuid(filename) for m in metadata_files: if uuid in m: new = False break if new or update_existing: basename = egaia_parsefn.getBasename(filename) docx_filename = '.'.join([basename, 'metadata-%s' % language, uuid, 'docx']) out_file = os.path.join(os.path.dirname(filename), docx_filename) metadata = egaia_meta.getMetadata(filename, restrict=update_existing) writeDocx(out_file, write=json.dumps(metadata), force=True) new = True
[docs]def collectionMetadata(): """Provide a json string with all existing metadata""" docx_files = egaia_list.listFiles(filter_type="metadata") return docx2json(docx_files)
[docs]def filter_files(pattern): """Return a list of metadata files from a globbing pattern.""" matches = [] for root, dirnames, filenames in os.walk(os.getcwd()): for filename in fnmatch.filter(filenames, pattern): matches.append(os.path.join(root, filename)) return [i for i in matches if 'metadata-' in i]
def _cli(args): """egaia docx Create and update docx-format documents containing metadata for archive items. Export and import metadata to and from CSV and json. Usage: egaia docx --help egaia docx --new egaia docx --update egaia docx --to-csv=CSV-FILE [ DOCX ] egaia docx --from-csv=CSV-FILE [ --force ] egaia docx --to-json=JSON-FILE [ DOCX ] egaia docx --from-json=JSON-STR [ --append | --force ] [ DOCX ] """ if not args['DOCX']: args['DOCX'] = '*' if args['--new']: update() elif args['--update']: update(update_existing=True) elif args['--from-csv']: csv2docx(args['--from-csv'], force=args['--force']) elif args['--to-csv']: docx2csv(filter_files(args['DOCX']), args['--to-csv']) elif args['--to-json']: docx2json(filter_files(args['DOCX']), json_file=args['--to-json']) elif args['--from-json']: # add and update operations append = None write = None if args['--append']: append = args['--from-json'] else: write = args['--from-json'] for fn in filter_files(args['DOCX']): writeDocx( filename=fn, append=append, write=write, force=args['--force'] )