This module provides an API for image georeferencing,
map overlay, tif to png conversion and others.
It extracts the information from descriptor file and
warps the image to defined projection.

import argparse
import json
import os

from shutil import copyfile
from datetime import timedelta
from typing import List

import dateutil.parser as dparser
import matplotlib.image as mimg
import numpy as np
import tifffile

from PIL import Image
from osgeo import gdal
from osgeo.gdal import GRA_NearestNeighbour, GRA_Bilinear, GRA_Cubic
from geographiclib.geodesic import Geodesic
from pyorbital.orbital import Orbital
from directdemod import constants
from directdemod.misc import Encoder

[docs]class Georeferencer(object): """ This class provides an API for image georeferencing. It extracts the information from descriptor file, translates and warps the image to defined projection. """
[docs] def __init__(self, tle_file: str = ""): """Georeferencer constructor Args: tle_file (:obj:`string`, optional): file with orbit parameters """ self.tle_file = tle_file
[docs] def georef_tif(self, image_name: str, output_file: str, resample_alg=GRA_NearestNeighbour) -> None: """georeferences the satellite image from tif file using GDAL Python API. Descriptor is extracted directly from tif file Args: image_name (:obj:`string`): path to tiff file, which contains needed metadata output_file (:obj:`string`): path to output file resample_alg (:obj:`gdalconst`): resampling algorithm (nearest, bilinear, cubic) """ with tifffile.TiffFile(image_name) as tif: page = tif.pages[0] descriptor = page.tags["ImageDescription"].value descriptor = json.loads(descriptor) self.georef(descriptor, output_file, resample_alg)
[docs] def georef(self, descriptor: dict, output_file: str, resample_alg=GRA_NearestNeighbour) -> None: """georeferences the satellite image from descriptor file using GDAL Python API Args: descriptor (:obj:`dict`): descriptor dictionary output_file (:obj:`string`): name of the output file resample_alg (:obj:`gdalconst`, optional): algorithm for resampling """ file_name = descriptor["image_name"] image = mimg.imread(file_name) gcps = self.compute_gcps(descriptor, image) options = gdal.TranslateOptions(format="GTiff", outputSRS=constants.DEFAULT_RS, GCPs=gcps) gdal.Translate(destName=constants.TEMP_TIFF_FILE, srcDS=file_name, options=options) options = gdal.WarpOptions(srcSRS=constants.DEFAULT_RS, dstSRS=constants.DEFAULT_RS, tps=True, resampleAlg=resample_alg) gdal.Warp(destNameOrDestDS=output_file, srcDSOrSrcDSTab=constants.TEMP_TIFF_FILE, options=options) os.remove(constants.TEMP_TIFF_FILE)
[docs] def georef_os(self, descriptor: dict, output_file: str) -> None: """georeferences the satellite image from descriptor file, using GDAL compiled binaries. Can be used when gdal binaries are available only Args: descriptor (:obj:`dict`): descriptor dictionary output_file (:obj:`string`): name of the output file """ file_name = descriptor["image_name"] image = mimg.imread(file_name) gcps = self.compute_gcps(descriptor, image) translate_flags = "-of GTiff -a_srs EPSG:4326" warp_flags = "-r near -tps -s_srs EPSG:4326 -t_srs EPSG:4326" translate_query = 'gdal_translate ' + translate_flags + ' ' + self.to_string_gcps(gcps) \ + ' "' + file_name + '" ' + ' "' + constants.TEMP_TIFF_FILE + '"' warp_query = 'gdalwarp ' + warp_flags + ' "' + constants.TEMP_TIFF_FILE + '" ' + ' "' + output_file + '"' os.system(translate_query) os.system(warp_query) os.remove(constants.TEMP_TIFF_FILE)
[docs] @staticmethod def to_string_gcps(gcps: List[gdal.GCP]) -> str: """create string representation of gcp points Args: gcps (:obj:`list`): list of gcp points Returns: :obj:`string`: gcp points represented as a string """ return " ".join([ ("-gcp " + str(gcp.GCPPixel) + " " + str(gcp.GCPLine) + " " + str(gcp.GCPX) + " " + str(gcp.GCPY)) for gcp in gcps ])
[docs] @staticmethod def create_desc(descriptor: dict, output_file: str) -> None: """create descriptor for `output_file` file Args: descriptor (:obj:`dict`): descriptor dictionary output_file (:obj:`string`): name of the output file """ desc = { "image_name": output_file, "sat_type": descriptor["sat_type"], "date_time": descriptor["date_time"], "center": descriptor["center"], "direction": descriptor["direction"] } name, _ = os.path.splitext(output_file) desc_name = name + "_desc.json" json.dump(desc, open(desc_name, 'w'), cls=Encoder)
[docs] def compute_gcps(self, descriptor: dict, image: np.ndarray) -> List[gdal.GCP]: """compute set of Ground Control Points Args: descriptor (:obj:`dict`): descriptor dictionary, which describes the image image (:obj:`np.ndarray`): image as np.ndarray Returns: :obj:`list`: list of GCPs """ height = image.shape[0] width = image.shape[1] center_w = width / 2 gcps = [] dtime = dparser.parse(descriptor["date_time"]) - timedelta(seconds=180) orbiter = Orbital(descriptor["sat_type"], tle_file=self.tle_file) min_delta = 500 middle_dist = 3.25 * 455 / 2. * 1000 far_dist = 3.15 * 455 * 1000 # 3.15 is because of image distortions towards to boudaries prev_position = orbiter.get_lonlatalt(dtime - timedelta( milliseconds=min_delta * 10)) for i in range(0, height, 10): current_height = height - i - 1 gcp_time = dtime + timedelta(milliseconds=i * min_delta) position = orbiter.get_lonlatalt(gcp_time) gcps.append( gdal.GCP(position[0], position[1], 0, center_w, current_height)) for i in range(0, height, 100): current_height = height - i - 1 gcp_time = dtime + timedelta(milliseconds=i * min_delta) position = orbiter.get_lonlatalt(gcp_time) angle = self.compute_angle(prev_position[0], prev_position[1], position[0], position[1]) azimuth = 90 - angle gcps.append( self.compute_gcp(position[0], position[1], azimuth, middle_dist, 3 * width / 4, current_height)) gcps.append( self.compute_gcp(position[0], position[1], azimuth, far_dist, width, current_height)) # Note +2 degrees is hand constant gcps.append( self.compute_gcp(position[0], position[1], azimuth + 182, middle_dist, width / 4, current_height)) gcps.append( self.compute_gcp(position[0], position[1], azimuth + 182, far_dist, 0, current_height)) gcps.append( self.compute_gcp(position[0], position[1], azimuth, middle_dist / 2, 5 * width / 8, current_height)) gcps.append( self.compute_gcp(position[0], position[1], azimuth, 3 * middle_dist / 2, 7 * width / 8, current_height)) # Note +2 degrees is hand constant gcps.append( self.compute_gcp(position[0], position[1], azimuth + 182, middle_dist / 2, 3 * width / 8, current_height)) gcps.append( self.compute_gcp(position[0], position[1], azimuth + 182, 3 * middle_dist / 2, width / 8, current_height)) prev_position = position return gcps
[docs] @staticmethod def compute_gcp(long: float, lat: float, angle: float, distance: float, width: float, height: float) -> gdal.GCP: """compute coordinate of GCP, using longitude and latitude of starting point, azimuth angle and distance to the point Args: long (:obj:`float`): longitude of start point lat (:obj:`float`): latitude of start point angle (:obj:`float`): azimuth between start point and GCP distance (:obj: `float`): distance to point in meters width (:obj:`float`): w-axis coordinate height (:obj:`float`): height-axis coordinate Returns: :obj:`gdal.GCP`: instance of GCP object """ coords = Geodesic.WGS84.Direct(lat, long, angle, distance) return gdal.GCP(coords['lon2'], coords['lat2'], 0, width, height)
[docs] @staticmethod def compute_angle(long1: float, lat1: float, long2: float, lat2: float) -> float: """compute angle between 2 points, defined by latitude and longitude Args: long1 (:obj:`float`): longitude of start point lat1 (:obj:`float`): latitude of start point long2 (:obj:`float`): longitude of end point lat2 (:obj:`float`): latitude of end point Returns: :obj:`float`: angle between points """ lat1 = np.radians(lat1) long1 = np.radians(long1) lat2 = np.radians(lat2) long2 = np.radians(long2) d_lon = (long2 - long1) sincos = np.sin(d_lon) * np.cos(lat2) cosdiff = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos( lat2) * np.cos(d_lon) brng = np.arctan2(sincos, cosdiff) brng = np.degrees(brng) brng = (brng + 360) % 360 brng = 360 - brng return brng
def overlay(raster_path: str, shapefile: str = constants.BORDERS, grayscale: bool = True) -> None: """create map overlay of borders shape file over raster Args: raster_path (:obj:`string`): path to raster (.tif) shapefile (:obj:`string`): path to shape file (.shp) grayscale (:obj:`bool`): type of image Throws: :obj:`NotImplementedError`: if passed grayscale False """ if grayscale: vector_ds = gdal.OpenEx(shapefile, gdal.OF_VECTOR) raster_ds = gdal.Open(raster_path, gdal.GA_Update) gdal.Rasterize(raster_ds, vector_ds, bands=[1], burnValues=[255]) else: raise NotImplementedError() def tif_to_png(filename: str, png_file: str, grayscale: bool = True) -> None: """covert tif image to png Args: filename (:obj:`string`): path to image (.tif) png_file (:obj:`string`): name of output file (.png) grayscale (:obj:`bool`): type of image Throws: :obj:`NotImplementedError`: if passed grayscale False """ if grayscale: img ="LA") else: raise NotImplementedError def set_nodata(filename: str, value: int = 0) -> None: """sets no-data value of tif 'filename' to 'value' Args: filename (:obj:`string`): path to image (.tif) value (:obj:`int`): value to set as no-data value """ _set_nodata(filename, constants.TEMP_TIFF_FILE, value=value) os.remove(filename) copyfile(constants.TEMP_TIFF_FILE, filename) os.remove(constants.TEMP_TIFF_FILE) def _set_nodata(filename: str, output_file: str, value: int = 0) -> None: """sets no-data value of tif 'filename' to 'value', saves to output_file Args: filename (:obj:`string`): path to image (.tif) output_file (:obj:`string`): name of output file (.tif) value (:obj:`int`): value to set as no-data value """ options = gdal.TranslateOptions(format="GTiff", noData=value) gdal.Translate(destName=output_file, srcDS=filename, options=options) def main() -> None: """Georeferencer CLI interface""" parser = argparse.ArgumentParser(description="Noaa georeferencer.") parser.add_argument('-i', '--image_name', required=True) parser.add_argument('-o', '--output_file', required=False) parser.add_argument('-m', '--map', action='store_true') parser.add_argument('-r', '--resample', required=False) args = parser.parse_args() resample = args.resample if resample is None or resample == 'nearest': resample = GRA_NearestNeighbour elif resample == 'bilinear': resample = GRA_Bilinear elif resample == 'cubic': resample = GRA_Cubic else: raise ValueError( "ERROR: Invalid resample algorithm (nearest, bilinear, cubic): " + str(resample)) referencer = Georeferencer(tle_file=constants.TLE_NOAA) output_file = args.output_file if args.output_file is not None else args.image_name referencer.georef_tif(args.image_name, output_file, resample_alg=resample) if overlay(output_file) if __name__ == "__main__": main()