# -*- coding: utf-8 -*-
import os
import gettext
import json
import codecs
import fnmatch
import unicodecsv as csv
import pkg_resources
import datetime
from docx import Document
from docx.shared import Inches
from docx.enum.style import WD_STYLE_TYPE
import egaia_config
import egaia_list
import egaia_parsefn
import egaia_meta
import egaia_root
# FIXME: For efficiency, just load everything into memory first?
language = egaia_config.getConfig('archive', 'language')
term_title = egaia_config.getConfig('terms', 'DCTERMS.title')
term_identifier = egaia_config.getConfig('terms', 'DCTERMS.identifier')
term_original_filename = egaia_config.getConfig('terms', 'original_filename')
term_type = egaia_config.getConfig('terms', 'DCTERMS.type')
term_toc = egaia_config.getConfig('terms', 'DCTERMS.tableOfContents')
archive_name = egaia_config.getConfig('archive', 'archive_name').decode('utf-8')
template = pkg_resources.resource_filename('egaia', 'static/template.docx')
[docs]def getCoreFields(filtered=True):
"""Return an ordered list of core metadata field labels."""
# the default metadata fields, by key
terms_list = egaia_config.getConfig(section='archive', key='core_metadata')
fields_list = list()
for term in terms_list.split(','):
# don't include tableOfContents in the defaults, since we want
# it to have tabular data but wish to avoid constructing an empty
# table if generating a new, blank document
if filtered and 'tableOfContents' in term:
continue
label = egaia_config.getConfig('terms', term.strip())
if label:
fields_list.append(label)
# create an unsorted dict to look up normalized => localized terms
#fields_r = dict((value, key) for key, value in fields.iteritems())
return fields_list
[docs]def parseDocx(docx_file):
"""Process a docx file and return a dict of metadata, or None."""
if not os.path.exists(docx_file):
return None
try:
document = Document(docx_file)
except:
print "Could not open %s" % docx_file
return None
data = []
key = None
val = list()
for para in document.paragraphs:
if 'Title' in para.style.name:
data.append((term_title, [para.text]))
elif 'Heading' in para.style.name:
# append prior data to the list
if key and val:
data.append((key, val))
key = para.text
# initialize new tuple
val = list()
else:
val.append(para.text)
# process last paragraph
if key and val:
data.append((key, val))
# now process the table of contents, if present
# protect against data loss with multiple tables
tables = document.tables
for table in tables:
t = list()
for row in table.rows:
r = list()
for cell in row.cells:
# this gives us a string with newline-delimited text, like a csv cell
r.append(cell.text)
t.append(r)
data.append((term_toc, t))
return dict(data)
[docs]def writeDocx(filename, append=None, write=None, out_file=None, force=False):
"""Write a dict to docx."""
print "Processing %s..." % filename
if not out_file:
out_file = filename
data = dict()
d = parseDocx(filename)
if d:
data.update(d)
if write:
new_data = json.loads(write)
if not force:
# overwrite data as a dry-run by default.
print "Dry-run. Use the --force flag to write data."
for x in new_data:
if not x in data:
continue
if sorted(data[x]) != sorted(new_data[x]):
print "OLD: %s" % data[x]
print "NEW: %s" % new_data[x]
print
return
data.update(new_data)
if append:
json_data = json.loads(append)
for k, v in json_data.iteritems():
if not isinstance(v, list):
v = [v]
# filter out empty values
v = filter(None, v)
if not data.get(k):
data[k] = list()
elif not isinstance(data[k], list):
data[k] = [data[k]]
s = data[k] + v
data[k] = [x for i, x in enumerate(s) if x not in s[:i]]
document = Document(template)
# clear the body. The template document should actually contain text
# so that the styles are not identified as latent.
document._body.clear_content()
document.core_properties.modified = datetime.datetime.utcnow()
# LibreOffice uses "Heading3", not "Heading 3"
h3 = document.styles['Heading3']
thumb = None
title = ''
uuid = data.get(term_identifier, '')
if isinstance(uuid, list):
uuid = uuid[0]
if data:
title = data.get(term_title, [''])
if isinstance(title, list):
doc_title = title[0]
else:
doc_title = title
# FIXME: unicode errors in using the actual title!
# docx/oxml/coreprops.py, line 299: value = str(value)
document.core_properties.title = uuid
document.add_heading(doc_title, 0)
if not thumb:
thumb = egaia_list.listFiles(uuid=uuid, filter_type='df-med')
if thumb:
# NB thumb is a list
dimensions = egaia_meta.getDimensions(thumb[0])
if dimensions:
dimensions = dimensions.split('x')
try:
if dimensions[0] > dimensions[1]:
# landscape mode
document.add_picture(thumb[0], width=Inches(6))
else:
# portrait mode
document.add_picture(thumb[0], height=Inches(4))
except:
print "Error adding image %s to the document" % thumb[0]
fields_list = getCoreFields()
for field in fields_list:
if term_title in field:
# we already have this as the document title
continue
content = ''
if data:
content = data.get(field, '')
document.add_paragraph(field, style=h3)
if isinstance(content, list):
# filter out the empty elements
content = filter(None, content)
for i in content:
document.add_paragraph(i)
else:
document.add_paragraph(content)
# process non-default metadata fields
if data:
for k, v in data.iteritems():
if k in fields_list or term_title in k:
#if term_title in k:
continue
if isinstance(v, list):
# filter out the empty elements
v = filter(None, v)
if len(v) < 1:
continue
document.add_paragraph(k, style=h3)
# if True: we have a list of lists, so treat this as a tabular array
if all(isinstance(i, list) for i in v):
# add a table
c = len(v[0]) # number of columns
table = document.add_table(0, c, style=None)
for i in v:
cells = table.add_row().cells
for count, elem in enumerate(i):
cells[count].text = elem
table.autofit = True
else:
for i in v:
document.add_paragraph(i)
else:
document.add_paragraph(k, style=h3)
document.add_paragraph(v)
print "Writing to %s..." % out_file
document.save(out_file)
[docs]def docx2csv(docx_files, csv_file):
"""Convert docx to csv"""
metadata = list()
for fn in docx_files:
row_data = parseDocx(fn)
# Convert lists to newline-delineated values in csv cells
for k, v in row_data.iteritems():
if isinstance(v, list):
row_data[k] = '\n'.join(v)
metadata.append(row_data)
with open(csv_file, 'wb') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=getCoreFields(), extrasaction='ignore')
writer.writeheader()
writer.writerows(metadata)
return
[docs]def docx2json(docx_files, json_file=None):
"""Convert docx to json"""
metadata = list()
for fn in docx_files:
row_data = parseDocx(fn)
metadata.append(row_data)
if not json_file:
return json.dumps(metadata)
with codecs.open(json_file, mode='w', encoding='utf-8') as json_doc:
json_doc.write(json.dumps(metadata, sort_keys=True, indent=4,
ensure_ascii=False))
return
[docs]def loadCsv(csv_path):
"""Load metadata csv using the DictReader. Return a list of dicts."""
# Copy to a list since the csv.reader requires an open file
metadata = list()
try:
with open(csv_path, 'rb') as csvfile:
csvreader = csv.DictReader(csvfile)
for row in csvreader:
metadata.append(row)
except:
#print "error reading %s" % csv_path
return None
return metadata
[docs]def csv2docx(csv_file, force=True):
"""Convert csv to docx files"""
metadata = loadCsv(csv_file)
if not metadata:
print "Error reading input metadata!"
return
for row in metadata:
row2 = dict()
for k, v in row.iteritems():
if '\n' in v:
v = v.splitlines()
if not isinstance(v, list):
v = [v]
row2[k] = v
# FIXME: create identifier if missing
uuid = row.get(term_identifier)
itemtype = row.get(term_type, '')
if 'collection' in itemtype.lower():
# this should be the collection metadata
docx_filename = os.path.join(egaia_root.get_root(), 'metadata-%s.docx' % language)
else:
# locate the original filename and place the docx file next to it
originals = egaia_list.listFiles(uuid=uuid,filter_type='originals')
if not originals:
print "Could not find item in the current collection: %s" % uuid
continue
orig = originals[0]
basename = egaia_parsefn.getBasename(orig)
if not uuid or not basename:
continue
docx_fn = '.'.join([basename,
'metadata-%s' % language,
uuid, 'docx'])
docx_filename = os.path.join(os.path.dirname(orig), docx_fn)
md = json.dumps(row2, encoding='utf-8')
writeDocx(docx_filename, write=md, force=force)
return
[docs]def update(update_existing=False):
"""Add new metadata files to the collection."""
# list the files
tagged_files = egaia_list.listFiles(filter_type='originals') + egaia_list.listDirs(filter_type='originals')
metadata_files = egaia_list.listFiles(filter_type='metadata')
new = True
for filename in tagged_files:
uuid = egaia_parsefn.getUuid(filename)
for m in metadata_files:
if uuid in m:
new = False
break
if new or update_existing:
basename = egaia_parsefn.getBasename(filename)
docx_filename = '.'.join([basename, 'metadata-%s' % language,
uuid, 'docx'])
out_file = os.path.join(os.path.dirname(filename), docx_filename)
metadata = egaia_meta.getMetadata(filename, restrict=update_existing)
writeDocx(out_file, write=json.dumps(metadata), force=True)
new = True
[docs]def filter_files(pattern):
"""Return a list of metadata files from a globbing pattern."""
matches = []
for root, dirnames, filenames in os.walk(os.getcwd()):
for filename in fnmatch.filter(filenames, pattern):
matches.append(os.path.join(root, filename))
return [i for i in matches if 'metadata-' in i]
def _cli(args):
"""egaia docx
Create and update docx-format documents containing metadata for archive
items. Export and import metadata to and from CSV and json.
Usage:
egaia docx --help
egaia docx --new
egaia docx --update
egaia docx --to-csv=CSV-FILE [ DOCX ]
egaia docx --from-csv=CSV-FILE [ --force ]
egaia docx --to-json=JSON-FILE [ DOCX ]
egaia docx --from-json=JSON-STR [ --append | --force ] [ DOCX ]
"""
if not args['DOCX']:
args['DOCX'] = '*'
if args['--new']:
update()
elif args['--update']:
update(update_existing=True)
elif args['--from-csv']:
csv2docx(args['--from-csv'], force=args['--force'])
elif args['--to-csv']:
docx2csv(filter_files(args['DOCX']), args['--to-csv'])
elif args['--to-json']:
docx2json(filter_files(args['DOCX']), json_file=args['--to-json'])
elif args['--from-json']:
# add and update operations
append = None
write = None
if args['--append']:
append = args['--from-json']
else:
write = args['--from-json']
for fn in filter_files(args['DOCX']):
writeDocx( filename=fn,
append=append,
write=write,
force=args['--force']
)