#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017-2018 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Tests for the AWIPS Tiled writer."""
import logging
import os
import warnings
from datetime import datetime, timedelta
from glob import glob
import dask
import dask.array as da
import numpy as np
import pytest
import xarray as xr
from pyproj import CRS
from satpy.resample import update_resampled_coords
def _check_production_location(ds):
if 'production_site' in ds.attrs:
prod_loc_name = 'production_site'
elif 'production_location' in ds.attrs:
prod_loc_name = 'producton_location'
else:
return
if prod_loc_name in ds.attrs:
assert len(ds.attrs[prod_loc_name]) == 31
[docs]def check_required_properties(unmasked_ds, masked_ds):
"""Check various aspects of coordinates and attributes for correctness."""
_check_scaled_x_coordinate_variable(unmasked_ds, masked_ds)
_check_scaled_y_coordinate_variable(unmasked_ds, masked_ds)
_check_required_common_attributes(unmasked_ds)
def _check_required_common_attributes(ds):
"""Check common properties of the created AWIPS tiles for validity."""
for attr_name in ('tile_row_offset', 'tile_column_offset',
'product_tile_height', 'product_tile_width',
'number_product_tiles',
'product_rows', 'product_columns'):
assert attr_name in ds.attrs
_check_production_location(ds)
for data_arr in ds.data_vars.values():
if data_arr.ndim == 0:
# grid mapping variable
assert 'grid_mapping_name' in data_arr.attrs
continue
assert data_arr.encoding.get('zlib', False)
assert 'grid_mapping' in data_arr.attrs
assert data_arr.attrs['grid_mapping'] in ds
def _check_scaled_x_coordinate_variable(ds, masked_ds):
assert 'x' in ds.coords
x_coord = ds.coords['x']
np.testing.assert_equal(np.diff(x_coord), 1)
x_attrs = x_coord.attrs
assert x_attrs.get('standard_name') == 'projection_x_coordinate'
assert x_attrs.get('units') == 'meter'
assert 'scale_factor' in x_attrs
assert x_attrs['scale_factor'] > 0
assert 'add_offset' in x_attrs
unscaled_x = masked_ds.coords['x'].values
assert (np.diff(unscaled_x) > 0).all()
def _check_scaled_y_coordinate_variable(ds, masked_ds):
assert 'y' in ds.coords
y_coord = ds.coords['y']
np.testing.assert_equal(np.diff(y_coord), 1)
y_attrs = y_coord.attrs
assert y_attrs.get('standard_name') == 'projection_y_coordinate'
assert y_attrs.get('units') == 'meter'
assert 'scale_factor' in y_attrs
assert y_attrs['scale_factor'] < 0
assert 'add_offset' in y_attrs
unscaled_y = masked_ds.coords['y'].values
assert (np.diff(unscaled_y) < 0).all()
[docs]class TestAWIPSTiledWriter:
"""Test basic functionality of AWIPS Tiled writer."""
[docs] def setup_method(self):
"""Create temporary directory to save files to."""
import tempfile
self.base_dir = tempfile.mkdtemp()
self.start_time = datetime(2018, 1, 1, 12, 0, 0)
self.end_time = self.start_time + timedelta(minutes=20)
[docs] def teardown_method(self):
"""Remove the temporary directory created for a test."""
try:
import shutil
shutil.rmtree(self.base_dir, ignore_errors=True)
except OSError:
pass
[docs] def test_init(self):
"""Test basic init method of writer."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
AWIPSTiledWriter(base_dir=self.base_dir)
def _get_test_area(self, shape=(200, 100), crs=None, extents=None):
from pyresample.geometry import AreaDefinition
if crs is None:
crs = CRS('+proj=lcc +datum=WGS84 +ellps=WGS84 +lon_0=-95. +lat_0=25 +lat_1=25 +units=m +no_defs')
if extents is None:
extents = (-1000., -1500., 1000., 1500.)
area_def = AreaDefinition(
'test',
'test',
'test',
crs,
shape[1],
shape[0],
extents,
)
return area_def
def _get_test_data(self, shape=(200, 100), chunks=50):
data = np.linspace(0., 1., shape[0] * shape[1], dtype=np.float32).reshape(shape)
return da.from_array(data, chunks=chunks)
def _get_test_lcc_data(self, dask_arr, area_def):
ds = xr.DataArray(
dask_arr,
dims=('y', 'x') if dask_arr.ndim == 2 else ('bands', 'y', 'x'),
attrs=dict(
name='test_ds',
platform_name='PLAT',
sensor='SENSOR',
units='1',
standard_name='toa_bidirectional_reflectance',
area=area_def,
start_time=self.start_time,
end_time=self.end_time)
)
return update_resampled_coords(ds, ds, area_def)
[docs] @pytest.mark.parametrize('use_save_dataset',
[(False,), (True,)])
def test_basic_numbered_1_tile(self, use_save_dataset, caplog):
"""Test creating a single numbered tile."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
data = self._get_test_data()
area_def = self._get_test_area()
input_data_arr = self._get_test_lcc_data(data, area_def)
with caplog.at_level(logging.DEBUG):
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True)
if use_save_dataset:
w.save_dataset(input_data_arr, sector_id='TEST', source_name='TESTS')
else:
w.save_datasets([input_data_arr], sector_id='TEST', source_name='TESTS')
assert "no routine matching" not in caplog.text
assert "Can't format string" not in caplog.text
all_files = glob(os.path.join(self.base_dir, 'TESTS_AII*.nc'))
assert len(all_files) == 1
assert os.path.basename(all_files[0]) == 'TESTS_AII_PLAT_SENSOR_test_ds_TEST_T001_20180101_1200.nc'
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
output_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, output_ds)
scale_factor = output_ds['data'].encoding['scale_factor']
np.testing.assert_allclose(input_data_arr.values, output_ds['data'].data,
atol=scale_factor / 2)
[docs] def test_units_length_warning(self):
"""Test long 'units' warnings are raised."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
data = self._get_test_data()
area_def = self._get_test_area()
input_data_arr = self._get_test_lcc_data(data, area_def)
input_data_arr.attrs["units"] = "this is a really long units string"
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True)
with warnings.catch_warnings(record=True) as caught_warnings:
w.save_dataset(input_data_arr, sector_id='TEST', source_name='TESTS')
assert len(caught_warnings) == 1
warn_msg = caught_warnings[0].message.args[0]
assert "too long" in warn_msg
assert "this is a really long units string" in warn_msg
[docs] @pytest.mark.parametrize(
("tile_count", "tile_size"),
[
((3, 3), None),
(None, (67, 34)),
(None, None),
]
)
def test_basic_numbered_tiles(self, tile_count, tile_size):
"""Test creating a multiple numbered tiles."""
from satpy.tests.utils import CustomScheduler
from satpy.writers.awips_tiled import AWIPSTiledWriter
data = self._get_test_data()
area_def = self._get_test_area()
input_data_arr = self._get_test_lcc_data(data, area_def)
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True)
save_kwargs = dict(
sector_id='TEST',
source_name="TESTS",
tile_count=tile_count,
tile_size=tile_size,
extra_global_attrs={'my_global': 'TEST'}
)
should_error = tile_count is None and tile_size is None
if should_error:
with dask.config.set(scheduler=CustomScheduler(0)),\
pytest.raises(ValueError, match=r'Either.*tile_count.*'):
w.save_datasets([input_data_arr], **save_kwargs)
else:
with dask.config.set(scheduler=CustomScheduler(1 * 2)): # precompute=*2
w.save_datasets([input_data_arr], **save_kwargs)
all_files = glob(os.path.join(self.base_dir, 'TESTS_AII*.nc'))
expected_num_files = 0 if should_error else 9
assert len(all_files) == expected_num_files
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
assert unmasked_ds.attrs['my_global'] == 'TEST'
assert unmasked_ds.attrs['sector_id'] == 'TEST'
assert 'physical_element' in unmasked_ds.attrs
stime = input_data_arr.attrs['start_time']
assert unmasked_ds.attrs['start_date_time'] == stime.strftime('%Y-%m-%dT%H:%M:%S')
[docs] def test_basic_lettered_tiles(self):
"""Test creating a lettered grid."""
import xarray as xr
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True)
data = self._get_test_data(shape=(2000, 1000), chunks=500)
area_def = self._get_test_area(shape=(2000, 1000),
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = self._get_test_lcc_data(data, area_def)
# tile_count should be ignored since we specified lettered_grid
w.save_datasets([ds], sector_id='LCC', source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
all_files = glob(os.path.join(self.base_dir, 'TESTS_AII*.nc'))
assert len(all_files) == 16
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
assert masked_ds.attrs['start_date_time'] == self.start_time.strftime('%Y-%m-%dT%H:%M:%S')
[docs] def test_basic_lettered_tiles_diff_projection(self):
"""Test creating a lettered grid from data with differing projection.."""
import xarray as xr
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True)
crs = CRS("+proj=lcc +datum=WGS84 +ellps=WGS84 +lon_0=-95. +lat_0=45 +lat_1=45 +units=m +no_defs")
data = self._get_test_data(shape=(2000, 1000), chunks=500)
area_def = self._get_test_area(shape=(2000, 1000), crs=crs,
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = self._get_test_lcc_data(data, area_def)
# tile_count should be ignored since we specified lettered_grid
w.save_datasets([ds], sector_id='LCC', source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
all_files = sorted(glob(os.path.join(self.base_dir, 'TESTS_AII*.nc')))
assert len(all_files) == 24
assert "TC02" in all_files[0] # the first tile should be TC02
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
assert masked_ds.attrs['start_date_time'] == self.start_time.strftime('%Y-%m-%dT%H:%M:%S')
[docs] def test_lettered_tiles_update_existing(self):
"""Test updating lettered tiles with additional data."""
import shutil
import dask
import xarray as xr
from satpy.writers.awips_tiled import AWIPSTiledWriter
first_base_dir = os.path.join(self.base_dir, 'first')
w = AWIPSTiledWriter(base_dir=first_base_dir, compress=True)
shape = (2000, 1000)
data = np.linspace(0., 1., shape[0] * shape[1], dtype=np.float32).reshape(shape)
# pixels to be filled in later
data[:, -200:] = np.nan
data = da.from_array(data, chunks=500)
area_def = self._get_test_area(shape=(2000, 1000),
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = self._get_test_lcc_data(data, area_def)
# tile_count should be ignored since we specified lettered_grid
w.save_datasets([ds], sector_id='LCC', source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
all_files = sorted(glob(os.path.join(first_base_dir, 'TESTS_AII*.nc')))
assert len(all_files) == 16
first_files = []
second_base_dir = os.path.join(self.base_dir, 'second')
os.makedirs(second_base_dir)
for fn in all_files:
new_fn = fn.replace(first_base_dir, second_base_dir)
shutil.copy(fn, new_fn)
first_files.append(new_fn)
# Second writing/updating
# Area is about 100 pixels to the right
area_def2 = self._get_test_area(shape=(2000, 1000),
extents=(-800000., -1500000., 1200000., 1500000.))
data2 = np.linspace(0., 1., 2000000, dtype=np.float32).reshape((2000, 1000))
# a gap at the beginning where old values remain
data2[:, :200] = np.nan
# a gap at the end where old values remain
data2[:, -400:-300] = np.nan
data2 = da.from_array(data2, chunks=500)
ds2 = self._get_test_lcc_data(data2, area_def2)
w = AWIPSTiledWriter(base_dir=second_base_dir, compress=True)
# HACK: The _copy_to_existing function hangs when opening the output
# file multiple times...sometimes. If we limit dask to one worker
# it seems to work fine.
with dask.config.set(num_workers=1):
w.save_datasets([ds2], sector_id='LCC', source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
all_files = glob(os.path.join(second_base_dir, 'TESTS_AII*.nc'))
# 16 original tiles + 4 new tiles
assert len(all_files) == 20
# these tiles should be the right-most edge of the first image
first_right_edge_files = [x for x in first_files if 'P02' in x or 'P04' in x or 'V02' in x or 'V04' in x]
for new_file in first_right_edge_files:
orig_file = new_file.replace(second_base_dir, first_base_dir)
orig_nc = xr.open_dataset(orig_file)
orig_data = orig_nc['data'].values
if not np.isnan(orig_data).any():
# we only care about the tiles that had NaNs originally
continue
new_nc = xr.open_dataset(new_file)
new_data = new_nc['data'].values
# there should be at least some areas of the file
# that old data was present and hasn't been replaced
np.testing.assert_allclose(orig_data[:, :20], new_data[:, :20])
# it isn't exactly 200 because the tiles aren't aligned with the
# data (the left-most tile doesn't have data until some columns
# in), but it should be at least that many columns
assert np.isnan(orig_data[:, 200:]).all()
assert not np.isnan(new_data[:, 200:]).all()
[docs] def test_lettered_tiles_sector_ref(self):
"""Test creating a lettered grid using the sector as reference."""
import xarray as xr
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True)
data = self._get_test_data(shape=(2000, 1000), chunks=500)
area_def = self._get_test_area(shape=(2000, 1000),
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = self._get_test_lcc_data(data, area_def)
w.save_datasets([ds], sector_id='LCC', source_name="TESTS",
lettered_grid=True, use_sector_reference=True,
use_end_time=True)
all_files = glob(os.path.join(self.base_dir, 'TESTS_AII*.nc'))
assert len(all_files) == 16
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
expected_start = (self.start_time + timedelta(minutes=20)).strftime('%Y-%m-%dT%H:%M:%S')
assert masked_ds.attrs['start_date_time'] == expected_start
[docs] def test_lettered_tiles_no_fit(self):
"""Test creating a lettered grid with no data overlapping the grid."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True)
data = self._get_test_data(shape=(2000, 1000), chunks=500)
area_def = self._get_test_area(shape=(2000, 1000),
extents=(4000000., 5000000., 5000000., 6000000.))
ds = self._get_test_lcc_data(data, area_def)
w.save_datasets([ds], sector_id='LCC', source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
# No files created
all_files = glob(os.path.join(self.base_dir, 'TESTS_AII*.nc'))
assert not all_files
[docs] def test_lettered_tiles_no_valid_data(self):
"""Test creating a lettered grid with no valid data."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True)
data = da.full((2000, 1000), np.nan, chunks=500, dtype=np.float32)
area_def = self._get_test_area(shape=(2000, 1000),
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = self._get_test_lcc_data(data, area_def)
w.save_datasets([ds], sector_id='LCC', source_name="TESTS", tile_count=(3, 3), lettered_grid=True)
# No files created - all NaNs should result in no tiles being created
all_files = glob(os.path.join(self.base_dir, 'TESTS_AII*.nc'))
assert not all_files
[docs] def test_lettered_tiles_bad_filename(self):
"""Test creating a lettered grid with a bad filename."""
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True, filename="{Bad Key}.nc")
data = self._get_test_data(shape=(2000, 1000), chunks=500)
area_def = self._get_test_area(shape=(2000, 1000),
extents=(-1000000., -1500000., 1000000., 1500000.))
ds = self._get_test_lcc_data(data, area_def)
with pytest.raises(KeyError):
w.save_datasets([ds],
sector_id='LCC',
source_name='TESTS',
tile_count=(3, 3),
lettered_grid=True)
[docs] def test_basic_numbered_tiles_rgb(self):
"""Test creating a multiple numbered tiles with RGB."""
import xarray as xr
from satpy.writers.awips_tiled import AWIPSTiledWriter
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True)
data = da.from_array(np.linspace(0., 1., 60000, dtype=np.float32).reshape((3, 200, 100)), chunks=50)
area_def = self._get_test_area()
ds = self._get_test_lcc_data(data, area_def)
ds = ds.rename(dict((old, new) for old, new in zip(ds.dims, ['bands', 'y', 'x'])))
ds.coords['bands'] = ['R', 'G', 'B']
w.save_datasets([ds], sector_id='TEST', source_name="TESTS", tile_count=(3, 3))
chan_files = glob(os.path.join(self.base_dir, 'TESTS_AII*test_ds_R*.nc'))
all_files = chan_files[:]
assert len(chan_files) == 9
chan_files = glob(os.path.join(self.base_dir, 'TESTS_AII*test_ds_G*.nc'))
all_files.extend(chan_files)
assert len(chan_files) == 9
chan_files = glob(os.path.join(self.base_dir, 'TESTS_AII*test_ds_B*.nc'))
assert len(chan_files) == 9
all_files.extend(chan_files)
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
[docs] @pytest.mark.parametrize(
"sector",
['C',
'F']
)
@pytest.mark.parametrize(
"extra_kwargs",
[
{},
{'environment_prefix': 'AA'},
{'environment_prefix': 'BB', 'filename': '{environment_prefix}_{name}_GLM_T{tile_number:04d}.nc'},
]
)
def test_multivar_numbered_tiles_glm(self, sector, extra_kwargs):
"""Test creating a tiles with multiple variables."""
import os
import xarray as xr
from satpy.writers.awips_tiled import AWIPSTiledWriter
os.environ['ORGANIZATION'] = '1' * 50
w = AWIPSTiledWriter(base_dir=self.base_dir, compress=True)
data = self._get_test_data()
area_def = self._get_test_area()
ds1 = self._get_test_lcc_data(data, area_def)
ds1.attrs.update(
dict(
name='total_energy',
platform_name='GOES-17',
sensor='SENSOR',
units='1',
scan_mode='M3',
scene_abbr=sector,
platform_shortname="G17"
)
)
ds2 = ds1.copy()
ds2.attrs.update({
'name': 'flash_extent_density',
})
ds3 = ds1.copy()
ds3.attrs.update({
'name': 'average_flash_area',
})
dqf = ds1.copy()
dqf = (dqf * 255).astype(np.uint8)
dqf.attrs = ds1.attrs.copy()
dqf.attrs.update({
'name': 'DQF',
'_FillValue': 1,
})
w.save_datasets([ds1, ds2, ds3, dqf], sector_id='TEST', source_name="TESTS",
tile_count=(3, 3), template='glm_l2_rad{}'.format(sector.lower()),
**extra_kwargs)
fn_glob = self._get_glm_glob_filename(extra_kwargs)
all_files = glob(os.path.join(self.base_dir, fn_glob))
assert len(all_files) == 9
for fn in all_files:
unmasked_ds = xr.open_dataset(fn, mask_and_scale=False)
masked_ds = xr.open_dataset(fn, mask_and_scale=True)
check_required_properties(unmasked_ds, masked_ds)
if sector == 'C':
assert masked_ds.attrs['time_coverage_end'] == self.end_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
else: # 'F'
assert masked_ds.attrs['time_coverage_end'] == self.end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
@staticmethod
def _get_glm_glob_filename(extra_kwargs):
if 'filename' in extra_kwargs:
return 'BB*_GLM*.nc'
elif 'environment_prefix' in extra_kwargs:
return 'AA*_GLM*.nc'
return 'DR*_GLM*.nc'