forked from cemac/LivingLabDataApp
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathGenerateCPCMap.py
More file actions
282 lines (250 loc) · 10.3 KB
/
GenerateCPCMap.py
File metadata and controls
282 lines (250 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from stravalib.client import Client
import matplotlib
matplotlib.use('Agg')
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import datetime as dt
import pandas as pd
import sys
import os
"""
Script name: GenerateCPCMap.py
Author: JON
Date: January 2018
Purpose: Used to generate an interactive google map showing concentration data
collected by volunteers carrying a CPC (condensation particle counter)
around Leeds University campus.
"""
def ReadCPCFile(CPCtext):
lines = CPCtext.splitlines()
# Read in required metadata:
iStartDate = -999
iStartTime = -999
iSampleLen = -999
iHeader = -999
for i, l in enumerate(lines[0:20]):
if l[0:10] == "Start Date":
iStartDate = i
if l[0:10] == "Start Time":
iStartTime = i
if l[0:13] == "Sample Length":
iSampleLen = i
if "Time" in l and "Concentration" in l:
iHeader = i
assert iStartDate >= 0, "Start date not found in CPC file header"
assert iStartTime >= 0, "Start time not found in CPC file header"
assert iSampleLen >= 0, "Sample length not found in CPC file header"
assert iHeader >= 0, "CPC file data header must contain Time and Concentration fields"
# Start Date:
temp = lines[iStartDate].split(',')
splt = [int(x) for x in temp[1].split('/')]
# Year might be in YY or YYYY format:
if splt[2] > 2000:
startYear = splt[2]
else:
startYear = splt[2] + 2000
startDate = dt.date(startYear, splt[0], splt[1])
# Start Time:
temp = lines[iStartTime].split(',')
splt = [int(x) for x in temp[1].split(':')]
startTime = dt.time(splt[0], splt[1], splt[2])
# Start Date/Time:
startDateTime = dt.datetime(startDate.year, startDate.month,
startDate.day, startTime.hour, startTime.minute, startTime.second)
# Sample length:
temp = lines[iSampleLen].split(',')
if ':' in temp[1]:
splt = [int(x) for x in temp[1].split(':')]
sampleLen = splt[0] * 60 + splt[1]
else:
sampleLen = int(temp[1])
# Now read in CPC data:
# start and end lines of data block:
startLine = iHeader + 1
for i in range(len(lines) - 2, len(lines) - 10, -1):
temp = lines[i].split(',')
if(temp[0] == '' or 'Comment for Sample 1'):
continue
else:
break
endLine = i
# Get index of time and conc data columns:
splt = lines[iHeader].split(',')
if len(splt) <= 2 or splt[2] == '':
iTime = 0
iConc = 1
elif len(splt) == 3:
iTime = 1
iConc = 2
else:
sys.exit("Unexpected number of data columns in CPC file")
# read data in:
dateTime = []
conc = []
for l in lines[startLine:endLine]:
temp = l.split(',')
splt = [int(x) for x in temp[iTime].split(':')]
dateTime.append(dt.datetime(startDate.year, startDate.month,
startDate.day, splt[0], splt[1], splt[2]))
conc.append(int(float(temp[iConc])))
CPCData = pd.DataFrame(data={'conc': conc, 'dateTime': dateTime})
return CPCData, startDateTime, sampleLen
def FetchGPSData(tokensFile, CPCdate, CPClen):
client = Client()
# To get the saved access tokens below, I did the following:
# 1. Run the following lines:
# authorize_url = client.authorization_url(client_id=22380, redirect_uri='http://sustainability.leeds.ac.uk',approval_prompt='force')
# print(authorize_url)
# 2. Paste the above url into a browser, accept the request,
# and copy the 'code' from the resulting url into the following line,
# along with the client_secret which can be found under air pollution9 account on strava:
# access_token = client.exchange_code_for_token(client_id=22380, client_secret='***',
# code='***')
# 3. Extract token from the above variable:
# print(access_token)
# Saved access tokens:
f = open(tokensFile, 'r')
myTokens = f.read().splitlines()
f.close()
# Find activity which most closely matches CPC start date/time and sample length
# All activities within 5 mins of the CPC start date are considered
# The activity with the closest-matching elapsed time to the CPC sample length is then chosen
validActs = {}
for i, token in enumerate(myTokens):
client.access_token = token
# athlete = client.get_athlete()
# print(athlete.firstname,athlete.lastname+':')
myActivities = client.get_activities()
for activity in myActivities:
startDate = activity.start_date_local
# print(' '+activity.name+':',startDate,'Local time')
if abs((CPCdate - startDate).total_seconds()) < 60:
validActs.update({i: activity.id})
assert len(
validActs) > 0, "No GPS activities with a start time within 5 minutes of the CPC data file start time"
DeltaT = 1e10
for key, value in validActs.items():
client.access_token = myTokens[key]
activity = client.get_activity(value)
elap = activity.elapsed_time.seconds
thisDT = abs(CPClen - elap)
if thisDT < DeltaT:
DeltaT = thisDT
chosenAth = key
chosenAct = value
# Extract required data from chosen activity:
client.access_token = myTokens[chosenAth]
activity = client.get_activity(chosenAct)
startDate = activity.start_date_local
endDate = startDate + dt.timedelta(seconds=activity.elapsed_time.seconds)
endDateCPC = CPCdate + dt.timedelta(seconds=CPClen)
assert abs((endDateCPC - endDate).total_seconds()
) < 60, "No valid GPS activities with an end time within 1 minute of the CPC data file end time"
myTypes = ['time', 'latlng']
myStream = client.get_activity_streams(chosenAct, types=myTypes)
latlon = myStream['latlng'].data
lat = [latlon[i][0] for i in range(len(latlon))]
lon = [latlon[i][1] for i in range(len(latlon))]
time = myStream['time'].data
dateTime = [startDate + dt.timedelta(seconds=i) for i in time]
GPSData = pd.DataFrame(data={'lon': lon, 'lat': lat, 'dateTime': dateTime})
return GPSData
def NearestNghbr(CPCData, GPSData):
MergeData = pd.merge(CPCData, GPSData, on=['dateTime'])
assert MergeData.shape[0] > 0, "CPC and GPS times don't overlap"
MergeData = MergeData.drop('dateTime', axis=1)
return MergeData
def rgba_to_hex(rgba_color):
red = int(rgba_color[0] * 255)
green = int(rgba_color[1] * 255)
blue = int(rgba_color[2] * 255)
return '#{r:02x}{g:02x}{b:02x}'.format(r=red, g=green, b=blue)
def ArrayMiddle(minLatLng, maxLatLng):
return [np.mean([minLatLng[0], maxLatLng[0]]), np.mean([minLatLng[1], maxLatLng[1]])]
def ArrayStats(lats, lons):
arrstats = {}
arrstats['min'] = [min(lats), min(lons)]
arrstats['max'] = [max(lats), max(lons)]
arrstats['middle'] = ArrayMiddle(arrstats['min'], arrstats['max'])
return arrstats
def Median(arr):
return np.median(arr)
def elementMean(arr):
return np.mean(arr, axis=0)
def elementMin(arr):
return np.min(arr, axis=0)
def elementMax(arr):
return np.max(arr, axis=0)
def CreateBins(file):
# The below hack is needed because the 'encoding' argument to np.loadtxt only exists
# in numpy v1.14 and later. The production server uses numpy 1.14 whereas the faculty
# python modules currently use an older version. The 'encoding' argument seems to be
# required on the production server, or it crashes.
try:
if float(np.version.version[0:4]) >= 1.14:
binLims = np.loadtxt(file, delimiter=',',
dtype='int', skiprows=1, encoding='utf-8')
else:
binLims = np.loadtxt(file, delimiter=',', dtype='int', skiprows=1)
except:
binLims = np.loadtxt(file, delimiter=',', dtype='int', skiprows=1)
return binLims
def AssignColours(binLims, colorProfile):
# List of Colormaps: https://matplotlib.org/users/colormaps.html
colsHex = []
if(colorProfile == "gr"):
rgmap = {'red': ((0.0, 0.1, 0.1),
(0.2, 0.0, 0.0),
(0.5, 0.96, 0.96),
(0.9, 1.0, 1.0),
(1.0, 0.5, 0.5)
),
'green': ((0.0, 0.6, 0.6),
(0.2, 1.0, 1.0),
(0.5, 1.0, 1.0),
(0.9, 0.0, 0.0),
(1.0, 0.0, 0.0),
),
'blue': ((0.0, 0.1, 0.1),
(0.2, 0.0, 0.0),
(0.5, 0.35, 0.35),
(0.9, 0.0, 0.0),
(1.0, 1.0, 1.0),
)
}
cmap = mpl.colors.LinearSegmentedColormap('RedGreen', rgmap)
else:
if(colorProfile == "bg"):
colorMap = 'viridis'
elif(colorProfile == "by"):
colorMap = 'inferno'
else:
colorMap = 'viridis' # if error, default to colorblind
cmap = matplotlib.cm.get_cmap(colorMap)
for i in range(0, len(binLims) + 1): # generate a color for each bin
colsHex.append(rgba_to_hex(cmap(i * 1 / (len(binLims)))))
return colsHex
def CreateColourBar(binLims, colsHex, colorProfile):
fig = plt.figure(figsize=(8, 1))
axs = fig.add_axes([0.05, 0.55, 0.9, 0.2])
cmap = mpl.colors.ListedColormap(colsHex[1:-1])
cmap.set_under(colsHex[0])
cmap.set_over(colsHex[-1])
norm = mpl.colors.BoundaryNorm(binLims, cmap.N)
cb = mpl.colorbar.ColorbarBase(axs, cmap=cmap,
norm=norm,
boundaries=[0.] + binLims + [100000.],
extend='both',
# Make the length of each extension
# the same as the length of the
# interior colors:
extendfrac='auto',
ticks=binLims,
spacing='uniform',
orientation='horizontal')
cb.set_label('particles per cubic centimetre')
plt.savefig("static/colourbar_" + colorProfile
+ ".png", dpi=300, transparent=True)