Source code for espatools.read

"""This module holds the file I/O methods for rasters and bands."""

__all__ = [
    'set_properties',
    'RasterSetReader',
]

import xmltodict
import numpy as np
from PIL import Image
import os
import collections
import properties

from .raster import RasterSet, Band


[docs]def set_properties(has_props_cls, input_dict, include_immutable=True): """A helper method to set an ``HasProperties`` object's properties from a dictionary""" props = has_props_cls() if not isinstance(input_dict, (dict, collections.OrderedDict)): raise RuntimeError('input_dict invalid: ', input_dict) for k, v in iter(input_dict.items()): if (k in has_props_cls._props and ( include_immutable or any(hasattr(has_props_cls._props[k], att) for att in ('required', 'new_name')) ) ): p = props._props.get(k) if isinstance(p, properties.HasProperties): props._set(k, set_properties(p, v, include_immutable=include_immutable)) elif isinstance(p, properties.Instance): props._set(k, set_properties(p.instance_class, v, include_immutable=include_immutable)) elif isinstance(p, properties.List): if not isinstance(v, list): raise RuntimeError('property value mismatch', p, v) if not isinstance(v[0], properties.HasProperties): prop = p.prop.instance_class newlist = [] for i in v: value = set_properties(prop, i, include_immutable=include_immutable) newlist.append(value) props._set(k, newlist) else: props._set(k, v) else: props._set(k, p.from_json(v)) # Return others as well # others_dict = {k: v for k, v in iter(input_dict.items()) # if k not in has_props_cls._props} return props #, others_dict
[docs]class RasterSetReader(object): """Read a series of raster files via their XML metadata file in ESPA schema""" def __init__(self, **kwargs): self.filename = kwargs.get('filename', None) self.yflip = kwargs.get('yflip', False) self.bdict = dict()
[docs] @staticmethod def read_tif(tifFile): """Reads a tif file to a 2D NumPy array""" img = Image.open(tifFile) img = np.array(img) return img
[docs] @staticmethod def clean_dict(d): d = {key.replace('@', '').replace('#', ''): item for key, item in d.items()} for key, item in d.items(): if isinstance(item, (collections.OrderedDict, dict)): d[key] = RasterSetReader.clean_dict(item) elif isinstance(item, list): for i in range(len(item)): if isinstance(item[i], (collections.OrderedDict, dict)): item[i] = RasterSetReader.clean_dict(item[i]) return d
[docs] def generate_band(self, band, meta_only=False, cast=False): """Genreate a Band object given band metadata Args: band (dict): dictionary containing metadata for a given band Return: Band : the loaded Band onject""" # Read the band data and add it to dictionary if not meta_only: fname = band.get('file_name') data = self.read_tif('%s/%s' % (os.path.dirname(self.filename), fname)) # band['data'] = data # TODO: data is not a properties object so do not set yet def fix_bitmap(d): p = d.get('bitmap_description') if p: lis = p.get('bit') bm = dict() # Fix bitmap_description from list of dicts to one dict for i in lis: key = i['num'] value = i['text'] bm[key] = value del d['bitmap_description'] d['bitmap_description'] = bm return d band = set_properties(Band, fix_bitmap(self.clean_dict(band))) if not meta_only: if cast: # cast as floats and fill bad values with nans data = data.astype(np.float32) data[data==band.fill_value] = -9999 if band.valid_range is not None: data[data<band.valid_range.min] = -9999 data[data>band.valid_range.max] = -9999 data[data==-9999] = np.nan else: data = np.ma.masked_where(data==band.fill_value, data) if band.valid_range is not None: data = np.ma.masked_where(data<band.valid_range.min, data) data = np.ma.masked_where(data>band.valid_range.max, data) # Flip y axis if requested if self.yflip: data = np.flip(data, 0) band.data = data if not meta_only: band.validate() return band
[docs] def read(self, meta_only=False, allowed=None, cast=False): """Read the ESPA XML metadata file""" if allowed is not None and not isinstance(allowed, (list, tuple)): raise RuntimeError('`allowed` must be a list of str names.') meta = xmltodict.parse( open(self.filename, 'r').read() ).get('espa_metadata') # Handle bands seperately bands = meta.get('bands').get('band') del(meta['bands']) if not isinstance(bands, (list)): bands = [bands] meta = self.clean_dict(meta) # Get spatial refernce ras = set_properties(RasterSet, meta) if allowed is not None: # Remove non-allowed arrays from bdict for k in list(self.bdict.keys()): if k not in allowed: del(self.bdict[k]) for i in range(len(bands)): info = self.generate_band(bands[i], meta_only=True, cast=cast) if allowed is not None and info.name not in allowed: continue if info.name not in self.bdict.keys() or self.bdict[info.name].data is None: b = self.generate_band(bands[i], meta_only=meta_only, cast=cast) self.bdict[b.name] = b elif cast and self.bdict[info.name].data.dtype != np.float32: b = self.generate_band(bands[i], meta_only=meta_only, cast=cast) self.bdict[b.name] = b elif not cast and self.bdict[info.name].data.dtype == np.float32: b = self.generate_band(bands[i], meta_only=meta_only, cast=cast) self.bdict[b.name] = b ras.bands = self.bdict if not meta_only: ras.validate() return ras
[docs] def Read(self, *args, **kwargs): return self.read(*args, **kwargs)
[docs] def set_file_name(self, filename): self.filename = filename
[docs] def SetFileName(self, filename): return self.set_file_name(filename)