def run(self, layers):
"""Risk plugin for tsunami population evacuation.
:param layers: List of layers expected to contain
hazard_layer: Raster layer of tsunami depth
exposure_layer: Raster layer of population data on the same grid
as hazard_layer
Counts number of people exposed to tsunami levels exceeding
specified threshold.
:returns: Map of population exposed to tsunami levels exceeding the
threshold. Table with number of people evacuated and supplies
required.
:rtype: tuple
"""
# Identify hazard and exposure layers
hazard_layer = get_hazard_layer(layers) # Tsunami inundation [m]
exposure_layer = get_exposure_layer(layers)
question = get_question(
hazard_layer.get_name(), exposure_layer.get_name(), self)
# Determine depths above which people are regarded affected [m]
# Use thresholds from inundation layer if specified
thresholds = self.parameters['thresholds [m]']
verify(
isinstance(thresholds, list),
'Expected thresholds to be a list. Got %s' % str(thresholds))
# Extract data as numeric arrays
data = hazard_layer.get_data(nan=0.0) # Depth
# Calculate impact as population exposed to depths > max threshold
population = exposure_layer.get_data(nan=0.0, scaling=True)
# Calculate impact to intermediate thresholds
counts = []
# merely initialize
impact = None
for i, lo in enumerate(thresholds):
if i == len(thresholds) - 1:
# The last threshold
impact = medium = numpy.where(data >= lo, population, 0)
else:
# Intermediate thresholds
hi = thresholds[i + 1]
medium = numpy.where((data >= lo) * (data < hi), population, 0)
# Count
val = int(numpy.sum(medium))
# Sensible rounding
val, rounding = population_rounding_full(val)
counts.append([val, rounding])
# Count totals
evacuated, rounding = counts[-1]
total = int(numpy.sum(population))
# Don't show digits less than a 1000
total = population_rounding(total)
minimum_needs = [
parameter.serialize() for parameter in
self.parameters['minimum needs']
]
# Generate impact report for the pdf map
# noinspection PyListCreation
table_body = [
question,
TableRow([(tr('People in %.1f m of water') % thresholds[-1]),
'%s*' % format_int(evacuated)],
header=True),
TableRow(
tr('* Number is rounded up to the nearest %s') % rounding),
TableRow(tr('Map shows the numbers of people needing evacuation'))]
total_needs = evacuated_population_needs(
evacuated, minimum_needs)
for frequency, needs in total_needs.items():
table_body.append(TableRow(
[
tr('Needs should be provided %s' % frequency),
tr('Total')
],
header=True))
for resource in needs:
table_body.append(TableRow([
tr(resource['table name']),
format_int(resource['amount'])]))
table_body.append(TableRow(tr('Action Checklist:'), header=True))
table_body.append(TableRow(tr('How will warnings be disseminated?')))
table_body.append(TableRow(tr('How will we reach stranded people?')))
table_body.append(TableRow(tr('Do we have enough relief items?')))
table_body.append(TableRow(tr('If yes, where are they located and how '
'will we distribute them?')))
#.........这里部分代码省略.........
def run(self, layers):
"""Indonesian Earthquake Fatality Model
Input
layers: List of layers expected to contain
H: Raster layer of MMI ground shaking
P: Raster layer of population density
"""
# Establish model coefficients
x = self.parameters['x']
y = self.parameters['y']
# Define percentages of people being displaced at each mmi level
displacement_rate = self.parameters['displacement_rate']
# Tolerance for transparency
tolerance = self.parameters['tolerance']
# Extract input layers
intensity = get_hazard_layer(layers)
population = get_exposure_layer(layers)
question = get_question(intensity.get_name(),
population.get_name(),
self)
# Extract data grids
H = intensity.get_data() # Ground Shaking
P = population.get_data(scaling=True) # Population Density
# Calculate population affected by each MMI level
# FIXME (Ole): this range is 2-9. Should 10 be included?
mmi_range = range(2, 10)
number_of_exposed = {}
number_of_displaced = {}
number_of_fatalities = {}
# Calculate fatality rates for observed Intensity values (H
# based on ITB power model
R = numpy.zeros(H.shape)
for mmi in mmi_range:
# Identify cells where MMI is in class i
mask = (H > mmi - 0.5) * (H <= mmi + 0.5)
# Count population affected by this shake level
I = numpy.where(mask, P, 0)
# Calculate expected number of fatalities per level
fatality_rate = numpy.power(10.0, x * mmi - y)
F = fatality_rate * I
# Calculate expected number of displaced people per level
try:
D = displacement_rate[mmi] * I
except KeyError, e:
msg = 'mmi = %i, I = %s, Error msg: %s' % (mmi, str(I), str(e))
raise InaSAFEError(msg)
# Adjust displaced people to disregard fatalities.
# Set to zero if there are more fatalities than displaced.
D = numpy.where(D > F, D - F, 0)
# Sum up numbers for map
R += D # Displaced
# Generate text with result for this study
# This is what is used in the real time system exposure table
number_of_exposed[mmi] = numpy.nansum(I.flat)
number_of_displaced[mmi] = numpy.nansum(D.flat)
number_of_fatalities[mmi] = numpy.nansum(F.flat)
def run(self, layers):
"""Earthquake impact to buildings (e.g. from Open Street Map)
"""
LOGGER.debug('Running earthquake building impact')
# Thresholds for mmi breakdown
t0 = self.parameters['low_threshold']
t1 = self.parameters['medium_threshold']
t2 = self.parameters['high_threshold']
class_1 = tr('Low')
class_2 = tr('Medium')
class_3 = tr('High')
# Extract data
H = get_hazard_layer(layers) # Depth
E = get_exposure_layer(layers) # Building locations
question = get_question(H.get_name(),
E.get_name(),
self)
# Define attribute name for hazard levels
hazard_attribute = 'mmi'
# Determine if exposure data have NEXIS attributes
attribute_names = E.get_attribute_names()
if ('FLOOR_AREA' in attribute_names and
'BUILDING_C' in attribute_names and
'CONTENTS_C' in attribute_names):
is_NEXIS = True
else:
is_NEXIS = False
# Interpolate hazard level to building locations
I = assign_hazard_values_to_exposure_data(H, E,
attribute_name=hazard_attribute)
# Extract relevant exposure data
#attribute_names = I.get_attribute_names()
attributes = I.get_data()
N = len(I)
# Calculate building impact
lo = 0
me = 0
hi = 0
building_values = {}
contents_values = {}
for key in range(4):
building_values[key] = 0
contents_values[key] = 0
for i in range(N):
# Classify building according to shake level
# and calculate dollar losses
if is_NEXIS:
try:
area = float(attributes[i]['FLOOR_AREA'])
except (ValueError, KeyError):
#print 'Got area', attributes[i]['FLOOR_AREA']
area = 0.0
try:
building_value_density = float(attributes[i]['BUILDING_C'])
except (ValueError, KeyError):
#print 'Got bld value', attributes[i]['BUILDING_C']
building_value_density = 0.0
try:
contents_value_density = float(attributes[i]['CONTENTS_C'])
except (ValueError, KeyError):
#print 'Got cont value', attributes[i]['CONTENTS_C']
contents_value_density = 0.0
building_value = building_value_density * area
contents_value = contents_value_density * area
x = float(attributes[i][hazard_attribute]) # MMI
if t0 <= x < t1:
lo += 1
cls = 1
elif t1 <= x < t2:
me += 1
cls = 2
elif t2 <= x:
hi += 1
cls = 3
else:
# Not reported for less than level t0
cls = 0
attributes[i][self.target_field] = cls
if is_NEXIS:
# Accumulate values in 1M dollar units
building_values[cls] += building_value
#.........这里部分代码省略.........
def run(self, layers):
"""Impact plugin for hazard impact
"""
# Extract data
H = get_hazard_layer(layers) # Value
E = get_exposure_layer(layers) # Building locations
question = get_question(H.get_name(),
E.get_name(),
self)
# Interpolate hazard level to building locations
H = assign_hazard_values_to_exposure_data(H, E,
attribute_name='hazard_lev',
mode='constant')
# Extract relevant numerical data
coordinates = H.get_geometry()
category = H.get_data()
N = len(category)
# List attributes to carry forward to result layer
#attributes = E.get_attribute_names()
# Calculate building impact according to guidelines
count2 = 0
count1 = 0
count0 = 0
building_impact = []
for i in range(N):
# Get category value
val = float(category[i]['hazard_lev'])
# Classify buildings according to value
## if val >= 2.0 / 3:
## affected = 2
## count2 += 1
## elif 1.0 / 3 <= val < 2.0 / 3:
## affected = 1
## count1 += 1
## else:
## affected = 0
## count0 += 1
## FIXME it would be good if the affected were words not numbers
## FIXME need to read hazard layer and see category or keyword
if val == 3:
affected = 3
count2 += 1
elif val == 2:
affected = 2
count1 += 1
elif val == 1:
affected = 1
count0 += 1
else:
affected = 'None'
# Collect depth and calculated damage
result_dict = {self.target_field: affected,
'CATEGORY': val}
# Record result for this feature
building_impact.append(result_dict)
# Create impact report
# Generate impact summary
table_body = [question,
TableRow([tr('Category'), tr('Affected')],
header=True),
TableRow([tr('High'), format_int(count2)]),
TableRow([tr('Medium'), format_int(count1)]),
TableRow([tr('Low'), format_int(count0)]),
TableRow([tr('All'), format_int(N)])]
table_body.append(TableRow(tr('Notes'), header=True))
table_body.append(tr('Categorised hazard has only 3'
' classes, high, medium and low.'))
impact_summary = Table(table_body).toNewlineFreeString()
impact_table = impact_summary
map_title = tr('Categorised hazard impact on buildings')
#FIXME it would be great to do categorized rather than grduated
# Create style
style_classes = [dict(label=tr('Low'), min=1, max=1,
colour='#1EFC7C', transparency=0, size=1),
dict(label=tr('Medium'), min=2, max=2,
colour='#FFA500', transparency=0, size=1),
dict(label=tr('High'), min=3, max=3,
colour='#F31A1C', transparency=0, size=1)]
style_info = dict(target_field=self.target_field,
style_classes=style_classes)
# Create vector layer and return
name = 'Buildings Affected'
V = Vector(data=building_impact,
projection=E.get_projection(),
geometry=coordinates,
#.........这里部分代码省略.........
def run(self, layers):
"""Risk plugin for volcano hazard on building/structure.
Counts number of building exposed to each volcano hazard zones.
:param layers: List of layers expected to contain.
* hazard_layer: Hazard layer of volcano
* exposure_layer: Vector layer of structure data on
the same grid as hazard_layer
:returns: Map of building exposed to volcanic hazard zones.
Table with number of buildings affected
:rtype: dict
"""
# Identify hazard and exposure layers
hazard_layer = get_hazard_layer(layers) # Volcano hazard layer
exposure_layer = get_exposure_layer(layers)
is_point_data = False
question = get_question(
hazard_layer.get_name(), exposure_layer.get_name(), self)
# Input checks
if not hazard_layer.is_vector:
msg = ('Input hazard %s was not a vector layer as expected '
% hazard_layer.get_name())
raise Exception(msg)
msg = ('Input hazard must be a polygon or point layer. I got %s '
'with layer type %s' %
(hazard_layer.get_name(), hazard_layer.get_geometry_name()))
if not (hazard_layer.is_polygon_data or hazard_layer.is_point_data):
raise Exception(msg)
if hazard_layer.is_point_data:
# Use concentric circles
radii = self.parameters['distances [km]']
is_point_data = True
centers = hazard_layer.get_geometry()
attributes = hazard_layer.get_data()
rad_m = [x * 1000 for x in radii] # Convert to meters
hazard_layer = buffer_points(centers, rad_m, data_table=attributes)
# To check
category_title = 'Radius'
category_names = rad_m
name_attribute = 'NAME' # As in e.g. the Smithsonian dataset
else:
# Use hazard map
category_title = 'KRB'
# FIXME (Ole): Change to English and use translation system
category_names = ['Kawasan Rawan Bencana III',
'Kawasan Rawan Bencana II',
'Kawasan Rawan Bencana I']
name_attribute = 'GUNUNG' # As in e.g. BNPB hazard map
# Get names of volcanoes considered
if name_attribute in hazard_layer.get_attribute_names():
volcano_name_list = []
for row in hazard_layer.get_data():
# Run through all polygons and get unique names
volcano_name_list.append(row[name_attribute])
volcano_names = ''
for name in volcano_name_list:
volcano_names += '%s, ' % name
volcano_names = volcano_names[:-2] # Strip trailing ', '
else:
volcano_names = tr('Not specified in data')
# Check if category_title exists in hazard_layer
if not category_title in hazard_layer.get_attribute_names():
msg = ('Hazard data %s did not contain expected '
'attribute %s ' % (hazard_layer.get_name(), category_title))
# noinspection PyExceptionInherit
raise InaSAFEError(msg)
# Find the target field name that has no conflict with default
# target
attribute_names = hazard_layer.get_attribute_names()
new_target_field = get_non_conflicting_attribute_name(
self.target_field, attribute_names)
self.target_field = new_target_field
# Run interpolation function for polygon2raster
interpolated_layer = assign_hazard_values_to_exposure_data(
hazard_layer, exposure_layer)
# Initialise attributes of output dataset with all attributes
# from input polygon and a building count of zero
new_data_table = hazard_layer.get_data()
categories = {}
for row in new_data_table:
row[self.target_field] = 0
category = row[category_title]
categories[category] = 0
# Count impacted building per polygon and total
#.........这里部分代码省略.........
def run(self, layers):
"""Risk plugin for flood population evacuation
Input
layers: List of layers expected to contain
H: Raster layer of flood depth
P: Raster layer of population data on the same grid as H
Counts number of people exposed to flood levels exceeding
specified threshold.
Return
Map of population exposed to flood levels exceeding the threshold
Table with number of people evacuated and supplies required
"""
# Identify hazard and exposure layers
H = get_hazard_layer(layers) # Flood inundation
E = get_exposure_layer(layers)
question = get_question(H.get_name(),
E.get_name(),
self)
# Check that hazard is polygon type
if not H.is_vector:
msg = ('Input hazard %s was not a vector layer as expected '
% H.get_name())
raise Exception(msg)
msg = ('Input hazard must be a polygon layer. I got %s with layer '
'type %s' % (H.get_name(),
H.get_geometry_name()))
if not H.is_polygon_data:
raise Exception(msg)
# Run interpolation function for polygon2raster
P = assign_hazard_values_to_exposure_data(H, E,
attribute_name='population')
# Initialise attributes of output dataset with all attributes
# from input polygon and a population count of zero
new_attributes = H.get_data()
category_title = 'FLOODPRONE' # FIXME: Should come from keywords
categories = {}
for attr in new_attributes:
attr[self.target_field] = 0
cat = attr[category_title]
categories[cat] = 0
# Count affected population per polygon, per category and total
evacuated = 0
for attr in P.get_data():
# Get population at this location
pop = float(attr['population'])
# Update population count for associated polygon
poly_id = attr['polygon_id']
new_attributes[poly_id][self.target_field] += pop
# Update population count for each category
cat = new_attributes[poly_id][category_title]
categories[cat] += pop
# Update total
evacuated += pop
# Count totals
total = int(numpy.sum(E.get_data(nan=0, scaling=False)))
# Don't show digits less than a 1000
if total > 1000:
total = total // 1000 * 1000
if evacuated > 1000:
evacuated = evacuated // 1000 * 1000
# Calculate estimated needs based on BNPB Perka 7/2008 minimum bantuan
rice = evacuated * 2.8
drinking_water = evacuated * 17.5
water = evacuated * 67
family_kits = evacuated / 5
toilets = evacuated / 20
# Generate impact report for the pdf map
table_body = [question,
TableRow([_('People needing evacuation'),
'%i' % evacuated],
header=True),
TableRow(_('Map shows population affected in each flood '
'prone area ')),
TableRow([_('Needs per week'), _('Total')],
header=True),
[_('Rice [kg]'), int(rice)],
[_('Drinking Water [l]'), int(drinking_water)],
[_('Clean Water [l]'), int(water)],
[_('Family Kits'), int(family_kits)],
[_('Toilets'), int(toilets)]]
impact_table = Table(table_body).toNewlineFreeString()
# Extend impact report for on-screen display
#.........这里部分代码省略.........
def run(self, layers):
"""Risk plugin for flood population evacuation
Input:
layers: List of layers expected to contain
my_hazard : Vector polygon layer of flood depth
my_exposure : Raster layer of population data on the same
grid as my_hazard
Counts number of people exposed to areas identified as flood prone
Return
Map of population exposed to flooding
Table with number of people evacuated and supplies required
"""
# Identify hazard and exposure layers
my_hazard = get_hazard_layer(layers) # Flood inundation
my_exposure = get_exposure_layer(layers)
question = get_question(my_hazard.get_name(),
my_exposure.get_name(),
self)
# Check that hazard is polygon type
if not my_hazard.is_vector:
msg = ('Input hazard %s was not a vector layer as expected '
% my_hazard.get_name())
raise Exception(msg)
msg = ('Input hazard must be a polygon layer. I got %s with layer '
'type %s' % (my_hazard.get_name(),
my_hazard.get_geometry_name()))
if not my_hazard.is_polygon_data:
raise Exception(msg)
# Run interpolation function for polygon2raster
P = assign_hazard_values_to_exposure_data(my_hazard, my_exposure,
attribute_name='population')
# Initialise attributes of output dataset with all attributes
# from input polygon and a population count of zero
new_attributes = my_hazard.get_data()
category_title = 'affected' # FIXME: Should come from keywords
deprecated_category_title = 'FLOODPRONE'
categories = {}
for attr in new_attributes:
attr[self.target_field] = 0
try:
cat = attr[category_title]
except KeyError:
cat = attr['FLOODPRONE']
categories[cat] = 0
# Count affected population per polygon, per category and total
affected_population = 0
for attr in P.get_data():
affected = False
if 'affected' in attr:
res = attr['affected']
if res is None:
x = False
else:
x = bool(res)
affected = x
elif 'FLOODPRONE' in attr:
# If there isn't an 'affected' attribute,
res = attr['FLOODPRONE']
if res is not None:
affected = res.lower() == 'yes'
elif 'Affected' in attr:
# Check the default attribute assigned for points
# covered by a polygon
res = attr['Affected']
if res is None:
x = False
else:
x = res
affected = x
else:
# there is no flood related attribute
msg = ('No flood related attribute found in %s. '
'I was looking fore either "Flooded", "FLOODPRONE" '
'or "Affected". The latter should have been '
'automatically set by call to '
'assign_hazard_values_to_exposure_data(). '
'Sorry I can\'t help more.')
raise Exception(msg)
if affected:
# Get population at this location
pop = float(attr['population'])
# Update population count for associated polygon
poly_id = attr['polygon_id']
new_attributes[poly_id][self.target_field] += pop
#.........这里部分代码省略.........
def run(self, layers):
"""Experimental impact function.
Input
layers: List of layers expected to contain
H: Polygon layer of inundation areas
E: Vector layer of roads
"""
target_field = self.parameters['target_field']
building_type_field = self.parameters['building_type_field']
affected_field = self.parameters['affected_field']
affected_value = self.parameters['affected_value']
# Extract data
H = get_hazard_layer(layers) # Flood
E = get_exposure_layer(layers) # Roads
question = get_question(H.get_name(),
E.get_name(),
self)
H = H.get_layer()
h_provider = H.dataProvider()
affected_field_index = h_provider.fieldNameIndex(affected_field)
if affected_field_index == -1:
message = tr('''Parameter "Affected Field"(='%s')
is not present in the
attribute table of the hazard layer.''' % (affected_field, ))
raise GetDataError(message)
E = E.get_layer()
srs = E.crs().toWkt()
e_provider = E.dataProvider()
fields = e_provider.fields()
# If target_field does not exist, add it:
if fields.indexFromName(target_field) == -1:
e_provider.addAttributes([QgsField(target_field,
QVariant.Int)])
target_field_index = e_provider.fieldNameIndex(target_field)
fields = e_provider.fields()
# Create layer for store the lines from E and extent
building_layer = QgsVectorLayer(
'Polygon?crs=' + srs, 'impact_buildings', 'memory')
building_provider = building_layer.dataProvider()
# Set attributes
building_provider.addAttributes(fields.toList())
building_layer.startEditing()
building_layer.commitChanges()
# Filter geometry and data using the extent
extent = QgsRectangle(*self.extent)
request = QgsFeatureRequest()
request.setFilterRect(extent)
# Split building_layer by H and save as result:
# 1) Filter from H inundated features
# 2) Mark buildings as inundated (1) or not inundated (0)
affected_field_type = \
h_provider.fields()[affected_field_index].typeName()
if affected_field_type in ['Real', 'Integer']:
affected_value = float(affected_value)
h_data = H.getFeatures(request)
hazard_poly = None
for mpolygon in h_data:
attrs = mpolygon.attributes()
if attrs[affected_field_index] != affected_value:
continue
if hazard_poly is None:
hazard_poly = QgsGeometry(mpolygon.geometry())
else:
# Make geometry union of inundated polygons
# But some mpolygon.geometry() could be invalid, skip them
tmp_geometry = hazard_poly.combine(mpolygon.geometry())
try:
if tmp_geometry.isGeosValid():
hazard_poly = tmp_geometry
except AttributeError:
pass
if hazard_poly is None:
message = tr('''There are no objects
in the hazard layer with
"Affected value"='%s'.
Please check the value or use other
extent.''' % (affected_value, ))
raise GetDataError(message)
e_data = E.getFeatures(request)
for feat in e_data:
building_geom = feat.geometry()
attrs = feat.attributes()
l_feat = QgsFeature()
l_feat.setGeometry(building_geom)
l_feat.setAttributes(attrs)
if hazard_poly.intersects(building_geom):
#.........这里部分代码省略.........
def run(self, layers):
"""Flood impact to buildings (e.g. from Open Street Map)
"""
threshold = 1.0 # Flood threshold [m]
# Extract data
H = get_hazard_layer(layers) # Depth
E = get_exposure_layer(layers) # Building locations
question = get_question(H.get_name(),
E.get_name(),
self)
# Determine attribute name for hazard levels
if H.is_raster:
hazard_attribute = 'depth'
else:
hazard_attribute = 'FLOODPRONE'
# Interpolate hazard level to building locations
I = assign_hazard_values_to_exposure_data(H, E,
attribute_name=hazard_attribute)
# Extract relevant exposure data
attribute_names = I.get_attribute_names()
attributes = I.get_data()
N = len(I)
# Calculate building impact
count = 0
buildings = {}
affected_buildings = {}
for i in range(N):
if hazard_attribute == 'depth':
# Get the interpolated depth
x = float(attributes[i]['depth'])
x = x > threshold
elif hazard_attribute == 'FLOODPRONE':
# Use interpolated polygon attribute
atts = attributes[i]
if 'FLOODPRONE' in atts:
res = atts['FLOODPRONE']
if res is None:
x = False
else:
x = res.lower() == 'yes'
else:
# If there isn't a flood prone attribute,
# assume that building is wet if inside polygon
# as flag by generic attribute AFFECTED
res = atts['Affected']
if res is None:
x = False
else:
x = res
else:
msg = (_('Unknown hazard type %s. '
'Must be either "depth" or "floodprone"')
% hazard_attribute)
raise Exception(msg)
# Count affected buildings by usage type if available
if 'type' in attribute_names:
usage = attributes[i]['type']
else:
usage = None
if usage is not None and usage != 0:
key = usage
else:
key = 'unknown'
if key not in buildings:
buildings[key] = 0
affected_buildings[key] = 0
# Count all buildings by type
buildings[key] += 1
if x is True:
# Count affected buildings by type
affected_buildings[key] += 1
# Count total affected buildings
count += 1
# Add calculated impact to existing attributes
attributes[i][self.target_field] = x
# Lump small entries and 'unknown' into 'other' category
for usage in buildings.keys():
x = buildings[usage]
if x < 25 or usage == 'unknown':
if 'other' not in buildings:
buildings['other'] = 0
affected_buildings['other'] = 0
buildings['other'] += x
affected_buildings['other'] += affected_buildings[usage]
#.........这里部分代码省略.........
def run(self, layers):
"""Plugin for impact of population as derived by categorised hazard.
Input
layers: List of layers expected to contain
hazard_layer: Raster layer of categorised hazard
exposure_layer: Raster layer of population data
Counts number of people exposed to each category of the hazard
Return
Map of population exposed to high category
Table with number of people in each category
"""
# The 3 category
high_t = self.parameters['Categorical thresholds'][2]
medium_t = self.parameters['Categorical thresholds'][1]
low_t = self.parameters['Categorical thresholds'][0]
# Identify hazard and exposure layers
hazard_layer = get_hazard_layer(layers) # Categorised Hazard
exposure_layer = get_exposure_layer(layers) # Population Raster
question = get_question(
hazard_layer.get_name(), exposure_layer.get_name(), self)
# Extract data as numeric arrays
C = hazard_layer.get_data(nan=0.0) # Category
# Calculate impact as population exposed to each category
P = exposure_layer.get_data(nan=0.0, scaling=True)
H = numpy.where(C <= high_t, P, 0)
M = numpy.where(C < medium_t, P, 0)
L = numpy.where(C < low_t, P, 0)
# Count totals
total = int(numpy.sum(P))
high = int(numpy.sum(H)) - int(numpy.sum(M))
medium = int(numpy.sum(M)) - int(numpy.sum(L))
low = int(numpy.sum(L))
total_impact = high + medium + low
# Don't show digits less than a 1000
total = round_thousand(total)
total_impact = round_thousand(total_impact)
high = round_thousand(high)
medium = round_thousand(medium)
low = round_thousand(low)
# Calculate estimated minimum needs
minimum_needs = self.parameters['minimum needs']
tot_needs = evacuated_population_weekly_needs(
total_impact, minimum_needs)
# Generate impact report for the pdf map
table_body = [
question,
TableRow([tr('People impacted '),
'%s' % format_int(total_impact)],
header=True),
TableRow([tr('People in high hazard area '),
'%s' % format_int(high)],
header=True),
TableRow([tr('People in medium hazard area '),
'%s' % format_int(medium)],
header=True),
TableRow([tr('People in low hazard area'),
'%s' % format_int(low)],
header=True)]
impact_table = Table(table_body).toNewlineFreeString()
# Extend impact report for on-screen display
table_body.extend([
TableRow(tr('Notes'), header=True),
tr('Map shows population density in high or medium hazard area'),
tr('Total population: %s') % format_int(total),
TableRow(tr(
'Table below shows the weekly minimum needs for all '
'affected people')),
TableRow([tr('Needs per week'), tr('Total')], header=True),
[tr('Rice [kg]'), format_int(tot_needs['rice'])],
[tr('Drinking Water [l]'), format_int(tot_needs['drinking_water'])],
[tr('Clean Water [l]'), format_int(tot_needs['water'])],
[tr('Family Kits'), format_int(tot_needs['family_kits'])],
[tr('Toilets'), format_int(tot_needs['toilets'])]
])
impact_summary = Table(table_body).toNewlineFreeString()
map_title = tr('People in high hazard areas')
# Generate 8 equidistant classes across the range of flooded population
# 8 is the number of classes in the predefined flood population style
# as imported
# noinspection PyTypeChecker
classes = numpy.linspace(
numpy.nanmin(M.flat[:]), numpy.nanmax(M.flat[:]), 8)
# Modify labels in existing flood style to show quantities
style_classes = style_info['style_classes']
#.........这里部分代码省略.........
def run(self, layers,
x=0.62275231, y=8.03314466): # , zeta=2.15):
"""Gender specific earthquake impact model
Input
layers: List of layers expected to contain
H: Raster layer of MMI ground shaking
P: Raster layer of population density
"""
# Define percentages of people being displaced at each mmi level
displacement_rate = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0,
7: 0.1, 8: 0.5, 9: 0.75, 10: 1.0}
# Extract input layers
intensity = get_hazard_layer(layers)
population = get_exposure_layer(layers)
question = get_question(intensity.get_name(),
population.get_name(),
self)
# Extract data grids
H = intensity.get_data() # Ground Shaking
P = population.get_data() # Population Density
# Calculate population affected by each MMI level
# FIXME (Ole): this range is 2-9. Should 10 be included?
mmi_range = range(2, 10)
number_of_exposed = {}
number_of_fatalities = {}
# Calculate fatality rates for observed Intensity values (H
# based on ITB power model
R = numpy.zeros(H.shape)
for mmi in mmi_range:
# Identify cells where MMI is in class i
mask = (H > mmi - 0.5) * (H <= mmi + 0.5)
# Count population affected by this shake level
I = numpy.where(mask, P, 0)
# Calculate expected number of fatalities per level
fatality_rate = numpy.power(10.0, x * mmi - y)
F = fatality_rate * I
# Sum up fatalities to create map
R += F
# Generate text with result for this study
# This is what is used in the real time system exposure table
number_of_exposed[mmi] = numpy.nansum(I.flat)
number_of_fatalities[mmi] = numpy.nansum(F.flat)
# Set resulting layer to zero when less than a threshold. This is to
# achieve transparency (see issue #126).
R[R < 1] = numpy.nan
# Total statistics
total = numpy.nansum(P.flat)
# Compute number of fatalities
fatalities = numpy.nansum(number_of_fatalities.values())
# Compute number of people displaced due to building collapse
displaced = 0
for mmi in mmi_range:
displaced += displacement_rate[mmi] * number_of_exposed[mmi]
displaced_women = displaced * 0.52 # Could be made province dependent
displaced_pregnant_women = displaced_women * 0.01387 # CHECK
# Generate impact report
table_body = [question]
# Add total fatality estimate
s = str(int(fatalities)).rjust(10)
table_body.append(TableRow([_('Number of fatalities'), s],
header=True))
# Add total estimate of people displaced
s = str(int(displaced)).rjust(10)
table_body.append(TableRow([_('Number of people displaced'), s],
header=True))
s = str(int(displaced_women)).rjust(10)
table_body.append(TableRow([_('Number of women displaced'), s],
header=True))
s = str(int(displaced_pregnant_women)).rjust(10)
table_body.append(TableRow([_('Number of pregnant women displaced'),
s],
header=True))
table_body.append(TableRow(_('Action Checklist:'), header=True))
table_body.append(_('Are enough shelters available for %i women?')
% displaced_women)
table_body.append(_('Are enough facilities available to assist %i '
'pregnant women?') % displaced_pregnant_women)
table_body.append(TableRow(_('Notes'), header=True))
#.........这里部分代码省略.........
请发表评论