diff --git a/exiftool.py b/exiftool.py index 1d1e3ae..7d84831 100644 --- a/exiftool.py +++ b/exiftool.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # PyExifTool -# Copyright 2012 Sven Marnach +# Copyright 2012 Sven Marnach. Enhancements by Leo Broska # This file is part of PyExifTool. # @@ -55,11 +55,15 @@ d["EXIF:DateTimeOriginal"])) """ +from __future__ import unicode_literals + import sys import subprocess import os import json import warnings +import logging +import codecs try: # Py3k compatibility basestring @@ -82,14 +86,23 @@ # some cases. block_size = 4096 +# constants related to keywords manipulations +KW_TAGNAME = "IPTC:Keywords" +KW_REPLACE, KW_ADD, KW_REMOVE = range(3) + + # This code has been adapted from Lib/os.py in the Python source tree # (sha1 265e36e277f3) def _fscodec(): encoding = sys.getfilesystemencoding() - if encoding == "mbcs": - errors = "strict" - else: - errors = "surrogateescape" + errors = "strict" + if encoding != "mbcs": + try: + codecs.lookup_error("surrogateescape") + except LookupError: + pass + else: + errors = "surrogateescape" def fsencode(filename): """ @@ -107,12 +120,47 @@ def fsencode(filename): fsencode = _fscodec() del _fscodec +#string helper +def strip_nl (s): + return ' '.join(s.splitlines()) + + +# Error checking function +# Note: They are quite fragile, beacsue teh just parse the output text from exiftool +def check_ok (result): + """Evaluates the output from a exiftool write operation (e.g. `set_tags`) + + The argument is the result from the execute method. + + The result is True or False. + """ + return not result is None and (not "due to errors" in result) + +def format_error (result): + """Evaluates the output from a exiftool write operation (e.g. `set_tags`) + + The argument is the result from the execute method. + + The result is a human readable one-line string. + """ + if check_ok (result): + return 'exiftool finished probably properly. ("%s")' % strip_nl(result) + else: + if result is None: + return "exiftool operation can't be evaluated: No result given" + else: + return 'exiftool finished with error: "%s"' % strip_nl(result) + + class ExifTool(object): """Run the `exiftool` command-line tool and communicate to it. - You can pass the file name of the ``exiftool`` executable as an - argument to the constructor. The default value ``exiftool`` will - only work if the executable is in your ``PATH``. + You can pass two arguments to the constructor: + - ``addedargs`` (list of strings): contains additional paramaters for + the stay-open instance of exiftool + - ``executable`` (string): file name of the ``exiftool`` executable. + The default value ``exiftool`` will only work if the executable + is in your ``PATH`` Most methods of this class are only available after calling :py:meth:`start()`, which will actually launch the subprocess. To @@ -143,11 +191,20 @@ class ExifTool(object): associated with a running subprocess. """ - def __init__(self, executable_=None): + def __init__(self, executable_=None, addedargs=None): + if executable_ is None: self.executable = executable else: self.executable = executable_ + + if addedargs is None: + self.addedargs = [] + elif type(addedargs) is list: + self.addedargs = addedargs + else: + raise TypeError("addedargs not a list of strings") + self.running = False def start(self): @@ -162,9 +219,12 @@ def start(self): warnings.warn("ExifTool already running; doing nothing.") return with open(os.devnull, "w") as devnull: + procargs = [self.executable, "-stay_open", "True", "-@", "-", + "-common_args", "-G", "-n"]; + procargs.extend(self.addedargs) + logging.debug(procargs) self._process = subprocess.Popen( - [self.executable, "-stay_open", "True", "-@", "-", - "-common_args", "-G", "-n"], + procargs, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=devnull) self.running = True @@ -213,7 +273,8 @@ def execute(self, *params): """ if not self.running: raise ValueError("ExifTool instance not running.") - self._process.stdin.write(b"\n".join(params + (b"-execute\n",))) + cmd_txt = b"\n".join(params + (b"-execute\n",)) + self._process.stdin.write(cmd_txt.encode("utf-8")) self._process.stdin.flush() output = b"" fd = self._process.stdout.fileno() @@ -228,10 +289,11 @@ def execute_json(self, *params): automatically adds the parameter ``-j`` to request JSON output from ``exiftool`` and parses the output. The return value is a list of dictionaries, mapping tag names to the corresponding - values. All keys are Unicode strings with the tag names, + values. All keys are Unicode strings with the tag names including the ExifTool group name in the format :. The values can have multiple types. All strings occurring as - values will be Unicode strings. + values will be Unicode strings. Each dictionary contains the + name of the file it corresponds to in the key ``"SourceFile"``. The parameters to this function must be either raw strings (type ``str`` in Python 2.x, type ``bytes`` in Python 3.x) or @@ -317,3 +379,92 @@ def get_tag(self, tag, filename): ``None`` if this tag was not found in the file. """ return self.get_tag_batch(tag, [filename])[0] + + def set_tags_batch(self, tags, filenames): + """Writes the values of the specified tags for the given files. + + The first argument is a dictionary of tags and values. The tag names may + include group names, as usual in the format :. + + The second argument is an iterable of file names. + + The format of the return value is the same as for + :py:meth:`execute()`. + + It can be passed into `check_ok()` and `format_error()`. + """ + # Explicitly ruling out strings here because passing in a + # string would lead to strange and hard-to-find errors + if isinstance(tags, basestring): + raise TypeError("The argument 'tags' must be dictionary " + "of strings") + if isinstance(filenames, basestring): + raise TypeError("The argument 'filenames' must be " + "an iterable of strings") + + params = [] + for tag, value in tags.items(): + params.append(u'-%s=%s' % (tag, value)) + + params.extend(filenames) + logging.debug (params) + return self.execute(*params) + + def set_tags(self, tags, filename): + """Writes the values of the specified tags for the given file. + + This is a convenience function derived from `set_tags_batch()`. + Only difference is that it takes as last arugemnt only one file name + as a string. + """ + return self.set_tags_batch(tags, [filename]) + + def set_keywords_batch(self, mode, keywords, filenames): + """Modifies the keywords tag for the given files. + + The first argument is the operation mode: + KW_REPLACE: Replace (i.e. set) the full keywords tag with `keywords`. + KW_ADD: Add `keywords` to the keywords tag. + If a keyword is present, just keep it. + KW_REMOVE: Remove `keywords` from the keywords tag. + If a keyword wasn't present, just leave it. + + The second argument is an iterable of key words. + + The third argument is an iterable of file names. + + The format of the return value is the same as for + :py:meth:`execute()`. + + It can be passed into `check_ok()` and `format_error()`. + """ + # Explicitly ruling out strings here because passing in a + # string would lead to strange and hard-to-find errors + if isinstance(keywords, basestring): + raise TypeError("The argument 'keywords' must be " + "an iterable of strings") + if isinstance(filenames, basestring): + raise TypeError("The argument 'filenames' must be " + "an iterable of strings") + + params = [] + + kw_operation = {KW_REPLACE:"-%s=%s", + KW_ADD:"-%s+=%s", + KW_REMOVE:"-%s-=%s"}[mode] + + kw_params = [ kw_operation % (KW_TAGNAME, w) for w in keywords ] + + params.extend(kw_params) + params.extend(filenames) + logging.debug (params) + return self.execute(*params) + + def set_keywords(self, mode, keywords, filename): + """Modifies the keywords tag for the given file. + + This is a convenience function derived from `set_keywords_batch()`. + Only difference is that it takes as last argument only one file name + as a string. + """ + return self.set_keywords_batch(mode, keywords, [filename]) diff --git a/test/test_exiftool.py b/test/test_exiftool.py index 3542632..b656495 100644 --- a/test/test_exiftool.py +++ b/test/test_exiftool.py @@ -1,13 +1,16 @@ # -*- coding: utf-8 -*- +from __future__ import unicode_literals + import unittest import exiftool import warnings import os +import shutil class TestExifTool(unittest.TestCase): def setUp(self): - self.et = exiftool.ExifTool() + self.et = exiftool.ExifTool(addedargs=["-overwrite_original"]) def tearDown(self): if hasattr(self, "et"): self.et.terminate() @@ -43,16 +46,13 @@ def test_termination_implicit(self): self.process = self.et._process del self.et self.assertNotEqual(self.process.poll(), None) + def test_get_metadata(self): - try: # Py3k compatibility - roeschen = "Röschen".decode("utf-8") - except AttributeError: - roeschen = "Röschen" expected_data = [{"SourceFile": "rose.jpg", "File:FileType": "JPEG", "File:ImageWidth": 70, "File:ImageHeight": 46, - "XMP:Subject": roeschen, + "XMP:Subject": "Röschen", "Composite:ImageSize": "70x46"}, {"SourceFile": "skyblue.png", "File:FileType": "PNG", @@ -82,7 +82,55 @@ def test_get_metadata(self): tags0["SourceFile"] = os.path.normpath(tags0["SourceFile"]) self.assertEqual(tags0, dict((k, expected_data[0][k]) for k in ["SourceFile", "XMP:Subject"])) - self.assertEqual(tag0, roeschen) + self.assertEqual(tag0, "Röschen") + + def test_set_metadata(self): + mod_prefix = "newcap_" + expected_data = [{"SourceFile": "rose.jpg", + "Caption-Abstract": "Ein Röschen ganz allein"}, + {"SourceFile": "skyblue.png", + "Caption-Abstract": "Blauer Himmel"}] + script_path = os.path.dirname(__file__) + source_files = [] + for d in expected_data: + d["SourceFile"] = f = os.path.join(script_path, d["SourceFile"]) + self.assertTrue(os.path.exists(f)) + f_mod = os.path.join(os.path.dirname(f), mod_prefix + os.path.basename(f)) + self.assertFalse(os.path.exists(f_mod), "%s should not exist before the test. Please delete." % f_mod) + shutil.copyfile(f, f_mod) + source_files.append(f_mod) + with self.et: + self.et.set_tags({"Caption-Abstract":d["Caption-Abstract"]}, f_mod) + tag0 = self.et.get_tag("IPTC:Caption-Abstract", f_mod) + os.remove(f_mod) + self.assertEqual(tag0, d["Caption-Abstract"]) + + def test_set_keywords(self): + kw_to_add = ["added"] + mod_prefix = "newkw_" + expected_data = [{"SourceFile": "rose.jpg", + "Keywords": ["nature", "red plant"]}] + script_path = os.path.dirname(__file__) + source_files = [] + for d in expected_data: + d["SourceFile"] = f = os.path.join(script_path, d["SourceFile"]) + self.assertTrue(os.path.exists(f)) + f_mod = os.path.join(os.path.dirname(f), mod_prefix + os.path.basename(f)) + self.assertFalse(os.path.exists(f_mod), "%s should not exist before the test. Please delete." % f_mod) + shutil.copyfile(f, f_mod) + source_files.append(f_mod) + with self.et: + self.et.set_keywords(exiftool.KW_REPLACE, d["Keywords"], f_mod) + kwtag0 = self.et.get_tag("IPTC:Keywords", f_mod) + kwrest = d["Keywords"][1:] + self.et.set_keywords(exiftool.KW_REMOVE, kwrest, f_mod) + kwtag1 = self.et.get_tag("IPTC:Keywords", f_mod) + self.et.set_keywords(exiftool.KW_ADD, kw_to_add, f_mod) + kwtag2 = self.et.get_tag("IPTC:Keywords", f_mod) + os.remove(f_mod) + self.assertEqual(kwtag0, d["Keywords"]) + self.assertEqual(kwtag1, d["Keywords"][0]) + self.assertEqual(kwtag2, [d["Keywords"][0]] + kw_to_add) if __name__ == '__main__': unittest.main()