You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1390 lines
50 KiB
Python

###############################################################################
# Copyright (C) 2013-2014 by gempa GmbH
#
# FDSNStation -- Implements the fdsnws-availability Web service, see
# http://www.fdsn.org/webservices/
#
# Feature notes:
# - additional request parameters:
# - excludetoolarge: boolean, default: true
#
# Author: Stephan Herrnkind
# Email: herrnkind@gempa.de
###############################################################################
from __future__ import absolute_import, division, print_function
from functools import cmp_to_key
from collections import OrderedDict
from twisted.cred import portal
from twisted.internet.threads import deferToThread
from twisted.web import http, resource, server
from zope.interface import implementer
from seiscomp import datamodel, io, logging
from seiscomp.client import Application
from seiscomp.core import Time
from .http import BaseResource
from .request import RequestOptions
from . import utils
DBMaxUInt = 18446744073709551615 # 2^64 - 1
VERSION = "1.0.0"
###############################################################################
class _AvailabilityRequestOptions(RequestOptions):
# merge options
VMergeSampleRate = 'samplerate'
VMergeQuality = 'quality'
VMergeOverlap = 'overlap'
VMerge = [VMergeSampleRate, VMergeQuality] # overlap option only available
# in query method
# orderby options
VOrderByNSLC = 'nslc_time_quality_samplerate'
VOrderByCount = 'timespancount'
VOrderByCountDesc = 'timespancount_desc'
VOrderByUpdate = 'latestupdate'
VOrderByUpdateDesc = 'latestupdate_desc'
VOrderBy = [
VOrderByNSLC, VOrderByCount, VOrderByCountDesc, VOrderByUpdate,
VOrderByUpdateDesc
]
# format options
VFormatText = 'text'
VFormatGeoCSV = 'geocsv'
VFormatJSON = 'json'
VFormatRequest = 'request'
OutputFormats = [VFormatText, VFormatGeoCSV, VFormatJSON, VFormatRequest]
# request parameters
PQuality = ['quality']
PMerge = ['merge']
POrderBy = ['orderby']
PLimit = ['limit']
PIncludeRestricted = ['includerestricted']
POSTParams = RequestOptions.POSTParams + PQuality + PMerge + POrderBy + \
PLimit + PIncludeRestricted
GETParams = RequestOptions.GETParams + POSTParams
#--------------------------------------------------------------------------
def __init__(self):
RequestOptions.__init__(self)
self.service = 'availability-base'
self.quality = None
self.mergeSampleRate = None
self.mergeQuality = None
self.mergeOverlap = None
self.orderBy = self.VOrderBy[0]
self.limit = None
self.includeRestricted = None
self.showLatestUpdate = None
#--------------------------------------------------------------------------
def parse(self):
self.parseTime()
self.parseChannel()
self.parseOutput()
# quality: D, M, Q, R, * (optional)
for v in self.getListValues(self.PQuality):
if len(v) == 1:
if v[0] == '*':
self.quality = None
break
if v[0].isupper():
if self.quality is None:
self.quality = [v]
else:
self.quality.append(v)
continue
self.raiseValueError(self.PQuality[0])
# merge (optional)
for v in self.getListValues(self.PMerge, True):
if v not in self.VMerge:
self.raiseValueError(self.PMerge[0])
if v == self.VMergeSampleRate:
self.mergeSampleRate = True
elif v == self.VMergeQuality:
self.mergeQuality = True
elif v == self.VMergeOverlap:
self.mergeOverlap = True
# orderby (optional)
key, value = self.getFirstValue(self.POrderBy)
if value is not None:
if value in self.VOrderBy:
self.orderBy = value
else:
self.raiseValueError(key)
# limit (optional)
self.limit = self.parseInt(self.PLimit, 1, DBMaxUInt)
# includeRestricted (optional)
self.includeRestricted = self.parseBool(self.PIncludeRestricted)
#--------------------------------------------------------------------------
def extentIter(self, dac, user=None, access=None):
# tupel: extent, oid, restricted
for e in dac.extentsSorted():
ext = e[0]
restricted = e[2]
wid = ext.waveformID()
net = wid.networkCode()
sta = wid.stationCode()
loc = wid.locationCode()
cha = wid.channelCode()
if restricted:
if not user:
continue
if access:
startTime = ext.start()
if ro.time.start() > startTime:
startTime = ro.time.start()
endTime = ext.end()
if ro.time.end() < ext.end():
endTime = ro.time.end()
if not access.authorize(user, net, sta, loc, cha,
startTime, endTime):
continue
for ro in self.streams:
if ro.channel:
if not ro.channel.matchNet(net) or \
not ro.channel.matchSta(sta) or \
not ro.channel.matchLoc(loc) or \
not ro.channel.matchCha(cha):
continue
if ro.time and not ro.time.match(ext.start(), ext.end()):
continue
if not self.includeRestricted and restricted:
continue
yield e
###############################################################################
class _AvailabilityExtentRequestOptions(_AvailabilityRequestOptions):
#--------------------------------------------------------------------------
def __init__(self):
_AvailabilityRequestOptions.__init__(self)
self.service = 'availability-extent'
self.showLatestUpdate = True
#--------------------------------------------------------------------------
def attributeExtentIter(self, ext):
for i in range(ext.dataAttributeExtentCount()):
e = ext.dataAttributeExtent(i)
if self.time and not self.time.match(e.start(), e.end()):
continue
if self.quality and e.quality() not in self.quality:
continue
yield e
###############################################################################
class _AvailabilityQueryRequestOptions(_AvailabilityRequestOptions):
# additional merge options
VMerge = _AvailabilityRequestOptions.VMerge + \
[_AvailabilityRequestOptions.VMergeOverlap]
# show options
VShowLatestUpdate = 'latestupdate'
VShow = [VShowLatestUpdate]
# additional query specific request parameters
PMergeGaps = ['mergegaps']
PShow = ['show']
PExcludeTooLarge = ['excludetoolarge']
POSTParams = _AvailabilityRequestOptions.POSTParams + PMergeGaps + \
PShow + PExcludeTooLarge
GETParams = _AvailabilityRequestOptions.GETParams + PMergeGaps + \
PShow + PExcludeTooLarge
#--------------------------------------------------------------------------
def __init__(self):
_AvailabilityRequestOptions.__init__(self)
self.service = 'availability-query'
self.orderBy = None
self.mergeGaps = None
self.excludeTooLarge = None
#--------------------------------------------------------------------------
def parse(self):
_AvailabilityRequestOptions.parse(self)
# merge gaps threshold (optional)
self.mergeGaps = self.parseFloat(self.PMergeGaps, 0)
# show (optional)
for v in self.getListValues(self.PShow, True):
if v not in self.VShow:
self.raiseValueError(v)
if v == self.VShowLatestUpdate:
self.showLatestUpdate = True
# exclude to large (optional)
self.excludeTooLarge = self.parseBool(self.PExcludeTooLarge)
if self.excludeTooLarge is None:
self.excludeTooLarge = True
if not self.channel:
raise ValueError('Request contains no selections')
if self.orderBy:
raise ValueError("orderby not supported for query request")
###############################################################################
class _Availability(BaseResource):
isLeaf = True
#--------------------------------------------------------------------------
def __init__(self, access=None, user=None):
BaseResource.__init__(self, VERSION)
self.access = access
self.user = user
#--------------------------------------------------------------------------
def render_OPTIONS(self, req):
req.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
req.setHeader('Access-Control-Allow-Headers',
'Accept, Content-Type, X-Requested-With, Origin')
req.setHeader('Content-Type', 'text/plain')
return ""
#--------------------------------------------------------------------------
def render_GET(self, req):
# Parse and validate GET parameters
ro = self._createRequestOptions()
try:
ro.parseGET(req.args)
ro.parse()
# the GET operation supports exactly one stream filter
ro.streams.append(ro)
except ValueError as e:
logging.warning(str(e))
return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro)
return self._prepareRequest(req, ro)
#--------------------------------------------------------------------------
def render_POST(self, req):
# Parse and validate POST parameters
ro = self._createRequestOptions()
try:
ro.parsePOST(req.content)
ro.parse()
except ValueError as e:
logging.warning(str(e))
return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro)
return self._prepareRequest(req, ro)
#--------------------------------------------------------------------------
def _prepareRequest(self, req, ro):
dac = Application.Instance().getDACache()
if ro.format == ro.VFormatJSON:
contentType = 'application/json'
extension = 'json'
elif ro.format == ro.VFormatGeoCSV:
contentType = 'text/csv'
extension = 'csv'
else:
contentType = 'text/plain'
extension = 'txt'
req.setHeader('Content-Type', contentType)
req.setHeader('Content-Disposition',
'inline; filename=fdsnws-ext-availability_{0}.{1}'
.format(Time.GMT().iso(), extension))
d = deferToThread(self._processRequest, req, ro, dac)
req.notifyFinish().addErrback(utils.onCancel, d)
d.addBoth(utils.onFinish, req)
# The request is handled by the deferred object
return server.NOT_DONE_YET
#--------------------------------------------------------------------------
@staticmethod
def _formatTime(time, ms=False):
if ms:
return '{0}.{1:06d}Z'.format(time.toString('%FT%T'),
time.microseconds())
return time.toString('%FT%TZ')
#--------------------------------------------------------------------------
def _writeLines(self, req, lines, ro):
if ro.format == ro.VFormatText:
return self._writeFormatText(req, lines, ro)
if ro.format == ro.VFormatGeoCSV:
return self._writeFormatGeoCSV(req, lines, ro)
if ro.format == ro.VFormatJSON:
return self._writeFormatJSON(req, lines, ro)
if ro.format == ro.VFormatRequest:
return self._writeFormatRequest(req, lines, ro)
raise Exception("unknown reponse format: %s" % ro.format)
#--------------------------------------------------------------------------
def _writeFormatText(self, req, lines, ro):
byteCount = 0
lineCount = 0
# the extent service uses a different update column name and alignment
isExtentReq = ro.__class__ == _AvailabilityExtentRequestOptions
nslc = '{0: <2} {1: <5} {2: <2} {3: <3}'
quality = ' {0: <1}'
sampleRate = ' {0: >11}'
sampleRateF = ' {0: >11.1f}'
time = ' {0: <27} {1: <27}'
updated = ' {0: <20}'
timeSpans = ' {0: >10}'
restriction = ' {0: <11}'
header = nslc.format('#N', 'S', 'L', 'C')
if not ro.mergeQuality:
header += quality.format('Q')
if not ro.mergeSampleRate:
header += sampleRate.format('SampleRate')
header += time.format('Earliest', 'Latest')
if ro.showLatestUpdate:
header += updated.format('Updated')
if isExtentReq:
header += timeSpans.format('TimeSpans')
header += restriction.format('Restriction')
header += '\n'
first = True
for line in lines:
if first:
first = False
utils.writeTS(req, header)
byteCount += len(header)
wid = line[0].waveformID()
e = line[1]
loc = wid.locationCode() if wid.locationCode() else "--"
data = nslc.format(wid.networkCode(), wid.stationCode(),
loc, wid.channelCode())
if not ro.mergeQuality:
data += quality.format(e.quality())
if not ro.mergeSampleRate:
data += sampleRateF.format(e.sampleRate())
data += time.format(self._formatTime(e.start(), True),
self._formatTime(e.end(), True))
if ro.showLatestUpdate:
data += updated.format(self._formatTime(e.updated()))
if isExtentReq:
data += timeSpans.format(e.segmentCount())
data += restriction.format('RESTRICTED' if line[2] else 'OPEN')
data += '\n'
utils.writeTS(req, data)
byteCount += len(data)
lineCount += 1
return byteCount, lineCount
#--------------------------------------------------------------------------
def _writeFormatRequest(self, req, lines, ro):
byteCount = 0
lineCount = 0
for line in lines:
wid = line[0].waveformID()
e = line[1]
loc = wid.locationCode() if wid.locationCode() else "--"
start = e.start()
end = e.end()
# truncate start and end time to requested time frame
if ro.time:
if ro.time.start and ro.time.start > start:
start = ro.time.start
if ro.time.end and ro.time.end < end:
end = ro.time.end
data = '{0} {1} {2} {3} {4} {5}\n'.format(
wid.networkCode(), wid.stationCode(), loc, wid.channelCode(),
self._formatTime(start, True), self._formatTime(end, True))
utils.writeTS(req, data)
byteCount += len(data)
lineCount += 1
return byteCount, lineCount
#--------------------------------------------------------------------------
def _writeFormatGeoCSV(self, req, lines, ro):
byteCount = 0
lineCount = 0
# the extent service uses a different update column name and alignment
isExtentReq = ro.__class__ == _AvailabilityExtentRequestOptions
nslc = '{0}|{1}|{2}|{3}'
time = '|{0}|{1}'
# header
fieldUnit = '#field_unit: unitless|unitless|unitless|unitless'
fieldType = '#field_type: string|string|string|string'
fieldName = 'Network|Station|Location|Channel'
if not ro.mergeQuality:
fieldUnit += '|unitless'
fieldType += '|string'
fieldName += '|Quality'
if not ro.mergeSampleRate:
fieldUnit += ' |hertz'
fieldType += ' |float'
fieldName += '|SampleRate'
fieldUnit += ' |ISO_8601|ISO_8601'
fieldType += ' |datetime|datetime'
fieldName += '|Earliest|Latest'
if ro.showLatestUpdate:
fieldUnit += '|ISO_8601'
fieldType += '|datetime'
fieldName += '|Updated'
if isExtentReq:
fieldUnit += '|unitless|unitless'
fieldType += '|integer|string'
fieldName += '|TimeSpans|Restriction'
header = '#dataset: GeoCSV 2.0\n' \
'#delimiter: |\n'
header += '{0}\n{1}\n{2}\n'.format(fieldUnit, fieldType, fieldName)
first = True
for line in lines:
if first:
first = False
utils.writeTS(req, header)
byteCount += len(header)
wid = line[0].waveformID()
e = line[1]
data = nslc.format(wid.networkCode(), wid.stationCode(),
wid.locationCode(), wid.channelCode())
if not ro.mergeQuality:
data += '|' + e.quality()
if not ro.mergeSampleRate:
data += '|{0:.1f}'.format(e.sampleRate())
data += time.format(self._formatTime(e.start(), True),
self._formatTime(e.end(), True))
if ro.showLatestUpdate:
data += '|' + self._formatTime(e.updated())
if isExtentReq:
data += '|{0:d}'.format(e.segmentCount())
data += '|RESTRICTED' if line[2] else '|OPEN'
data += '\n'
utils.writeTS(req, data)
byteCount += len(data)
lineCount += 1
return byteCount, lineCount
#--------------------------------------------------------------------------
def _writeFormatJSON(self, req, lines, ro):
header = '{{' \
'"created":"{0}",' \
'"version": 1.0,' \
'"datasources":['.format(self._formatTime(Time.GMT()))
footer = ']}'
return self._writeJSONChannels(req, header, footer, lines, ro)
###############################################################################
@implementer(portal.IRealm)
class FDSNAvailabilityExtentRealm(object):
#--------------------------------------------------------------------------
def __init__(self, access):
self.__access = access
#--------------------------------------------------------------------------
def requestAvatar(self, avatarId, mind, *interfaces):
if resource.IResource in interfaces:
user = {"mail": avatarId, "blacklisted": False}
return (resource.IResource,
FDSNAvailabilityExtent(self.__access, user),
lambda: None)
raise NotImplementedError()
###############################################################################
@implementer(portal.IRealm)
class FDSNAvailabilityExtentAuthRealm(object):
#--------------------------------------------------------------------------
def __init__(self, access, userdb):
self.__access = access
self.__userdb = userdb
#--------------------------------------------------------------------------
def requestAvatar(self, avatarId, mind, *interfaces):
if resource.IResource in interfaces:
user = self.__userdb.getAttributes(avatarId)
return (resource.IResource,
FDSNAvailabilityExtent(self.__access, user),
lambda: None)
raise NotImplementedError()
###############################################################################
class FDSNAvailabilityExtent(_Availability):
isLeaf = True
#--------------------------------------------------------------------------
def __init__(self, access=None, user=None):
_Availability.__init__(self, access, user)
#--------------------------------------------------------------------------
@staticmethod
def _createRequestOptions():
return _AvailabilityExtentRequestOptions()
#--------------------------------------------------------------------------
@staticmethod
def _mergeExtents(attributeExtents):
merged = None
cloned = False
# Create a copy of the extent only if more than 1 element is found. The
# number of elements is not known in advance since attributeExtents
# might only be an iterator.
for e in attributeExtents:
if merged is None:
merged = e
else:
if not cloned:
merged = datamodel.DataAttributeExtent(merged)
cloned = True
if e.start() < merged.start():
merged.setStart(e.start())
if e.end() > merged.end():
merged.setEnd(e.end())
if e.updated() > merged.updated():
merged.setUpdated(e.updated())
merged.setSegmentCount(
merged.segmentCount() + e.segmentCount())
return merged
#--------------------------------------------------------------------------
def _writeJSONChannels(self, req, header, footer, lines, ro):
byteCount = 0
nslc = '{{' \
'"network":"{0}",' \
'"station":"{1}",' \
'"location":"{2}",' \
'"channel":"{3}"'
quality = ',"quality":"{0}"'
sampleRate = ',"samplerate":{0}'
time = ',"earliest":"{0}","latest":"{1}","timespanCount":{2}' \
',"updated":"{3}","restriction":"{4}"}}'
utils.writeTS(req, header)
byteCount += len(header)
data = None
for line in lines:
wid = line[0].waveformID()
e = line[1]
data = "," if data is not None else ""
data += nslc.format(wid.networkCode(), wid.stationCode(),
wid.locationCode(), wid.channelCode())
if not ro.mergeQuality:
data += quality.format(e.quality())
if not ro.mergeSampleRate:
data += sampleRate.format(e.sampleRate())
data += time.format(self._formatTime(e.start(), True),
self._formatTime(e.end(), True),
e.segmentCount(),
self._formatTime(e.updated()),
format('RESTRICTED' if line[2] else 'OPEN'))
utils.writeTS(req, data)
byteCount += len(data)
utils.writeTS(req, footer)
byteCount += len(footer)
return byteCount, len(lines)
#--------------------------------------------------------------------------
def _processRequest(self, req, ro, dac):
if req._disconnected:
return False
# tuples: wid, attribute extent, restricted status
lines = []
mergeAll = ro.mergeQuality and ro.mergeSampleRate
mergeNone = not ro.mergeQuality and not ro.mergeSampleRate
# iterate extents
for ext, _, restricted in ro.extentIter(dac, self.user, self.access):
if req._disconnected:
return False
# iterate attribute extents and merge them if requested
if mergeNone:
for e in ro.attributeExtentIter(ext):
lines.append((ext, e, restricted))
elif mergeAll:
e = self._mergeExtents(ro.attributeExtentIter(ext))
if e is not None:
lines.append((ext, e, restricted))
elif ro.mergeQuality:
eDict = {} # key=sampleRate
for e in ro.attributeExtentIter(ext):
if e.sampleRate() in eDict:
eDict[e.sampleRate()].append(e)
else:
eDict[e.sampleRate()] = [e]
for k in eDict:
e = self._mergeExtents(eDict[k])
lines.append((ext, e, restricted))
else:
eDict = {} # key=quality
for e in ro.attributeExtentIter(ext):
if e.quality() in eDict:
eDict[e.quality()].append(e)
else:
eDict[e.quality()] = [e]
for k in eDict:
e = self._mergeExtents(eDict[k])
lines.append((ext, e, restricted))
# Return 204 if no matching availability information was found
if len(lines) == 0:
msg = "no matching availabilty information found"
self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
return True
# sort lines
self._sortLines(lines, ro)
# truncate lines to requested row limit
if ro.limit:
del lines[ro.limit:]
byteCount, extCount = self._writeLines(req, lines, ro)
logging.debug("%s: returned %i extents (total bytes: %i)" % (
ro.service, extCount, byteCount))
utils.accessLog(req, ro, http.OK, byteCount, None)
return True
#--------------------------------------------------------------------------
@staticmethod
def _sortLines(lines, ro):
def compareNSLC(l1, l2):
if l1[0] is not l2[0]:
# The lines are expected to be sorted according NSLC
return 0
e1 = l1[1]
e2 = l2[1]
if e1.start() < e2.start():
return -1
if e1.start() > e2.start():
return 1
if e1.end() < e2.end():
return -1
if e1.end() > e2.end():
return 1
if not ro.mergeQuality:
if e1.quality() < e2.quality():
return -1
if e1.quality() > e2.quality():
return 1
if not ro.mergeSampleRate:
if e1.sampleRate() < e2.sampleRate():
return -1
if e1.sampleRate() > e2.sampleRate():
return 1
return 0
def compareCount(l1, l2):
c1 = l1[1].segmentCount()
c2 = l2[1].segmentCount()
return -1 if c1 < c2 else \
1 if c1 > c2 else \
compareNSLC(l1, l2)
def compareCountDesc(l1, l2):
c1 = l1[1].segmentCount()
c2 = l2[1].segmentCount()
return -1 if c1 > c2 else \
1 if c1 < c2 else \
compareNSLC(l1, l2)
def compareUpdate(l1, l2):
c1 = l1[1].updated()
c2 = l2[1].updated()
return -1 if c1 < c2 else \
1 if c1 > c2 else \
compareNSLC(l1, l2)
def compareUpdateDesc(l1, l2):
c1 = l1[1].updated()
c2 = l2[1].updated()
return -1 if c1 > c2 else \
1 if c1 < c2 else \
compareNSLC(l1, l2)
comparator = compareNSLC if ro.orderBy == ro.VOrderByNSLC else \
compareCount if ro.orderBy == ro.VOrderByCount else \
compareCountDesc if ro.orderBy == ro.VOrderByCountDesc else \
compareUpdate if ro.orderBy == ro.VOrderByUpdate else \
compareUpdateDesc
lines.sort(key=cmp_to_key(comparator))
###############################################################################
@implementer(portal.IRealm)
class FDSNAvailabilityQueryRealm(object):
#--------------------------------------------------------------------------
def __init__(self, access):
self.__access = access
#--------------------------------------------------------------------------
def requestAvatar(self, avatarId, mind, *interfaces):
if resource.IResource in interfaces:
user = {"mail": avatarId, "blacklisted": False}
return (resource.IResource,
FDSNAvailabilityQuery(self.__access, user),
lambda: None)
raise NotImplementedError()
###############################################################################
@implementer(portal.IRealm)
class FDSNAvailabilityQueryAuthRealm(object):
#--------------------------------------------------------------------------
def __init__(self, access, userdb):
self.__access = access
self.__userdb = userdb
#--------------------------------------------------------------------------
def requestAvatar(self, avatarId, mind, *interfaces):
if resource.IResource in interfaces:
user = self.__userdb.getAttributes(avatarId)
return (resource.IResource,
FDSNAvailabilityQuery(self.__access, user),
lambda: None)
raise NotImplementedError()
###############################################################################
class FDSNAvailabilityQuery(_Availability):
isLeaf = True
#--------------------------------------------------------------------------
def __init__(self, access=None, user=None):
_Availability.__init__(self, access, user)
#--------------------------------------------------------------------------
@staticmethod
def _createRequestOptions():
return _AvailabilityQueryRequestOptions()
#--------------------------------------------------------------------------
def _writeJSONChannels(self, req, header, footer, lines, ro):
byteCount = 0
lineCount = 0
nslc = '{{' \
'"network":"{0}",' \
'"station":"{1}",' \
'"location":"{2}",' \
'"channel":"{3}"'
quality = ',"quality":"{0}"'
sampleRate = ',"samplerate":{0}'
updated = ',"updated":"{0}"'
timeSpans = ',"timespans":[['
seg = '"{0}","{1}"]'
class SegGroup:
def __init__(self, segments, updated):
self.segments = segments
self.updated = updated
def writeSegments(segments):
first = True
byteCount = 0
for s in segments:
if first:
first = False
data = timeSpans
else:
data = ",["
data += seg.format(
self._formatTime(s.start(), True),
self._formatTime(s.end(), True))
utils.writeTS(req, data)
byteCount += len(data)
return byteCount
ext = None
# merge of quality and sample rate: all timespans of one stream are
# grouped into one channel
if ro.mergeQuality and ro.mergeSampleRate:
lastUpdate = None
segments = None
for line in lines:
s = line[1]
lineCount += 1
if line[0] is not ext:
if ext is not None:
if byteCount == 0:
utils.writeTS(req, header)
byteCount += len(header)
data = ''
else:
data = ']},'
wid = ext.waveformID()
data += nslc.format(wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode())
if ro.showLatestUpdate:
data += updated.format(self._formatTime(lastUpdate))
utils.writeTS(req, data)
byteCount += len(data)
byteCount += writeSegments(segments)
ext = line[0]
lastUpdate = s.updated()
segments = [s]
else:
if s.updated() > lastUpdate:
lastUpdate = s.updated()
segments.append(s)
# handle last extent
if ext is not None:
if byteCount == 0:
utils.writeTS(req, header)
byteCount += len(header)
data = ''
else:
data = ']},'
wid = ext.waveformID()
data += nslc.format(wid.networkCode(), wid.stationCode(),
wid.locationCode(), wid.channelCode())
if ro.showLatestUpdate:
data += updated.format(self._formatTime(lastUpdate))
utils.writeTS(req, data)
byteCount += len(data)
byteCount += writeSegments(segments)
data = ']}'
utils.writeTS(req, data)
byteCount += len(data)
utils.writeTS(req, footer)
byteCount += len(footer)
# merge of quality: all timespans of one stream are grouped by
# sample rate
elif ro.mergeQuality:
segGroups = None
for line in lines:
s = line[1]
lineCount += 1
if line[0] is not ext:
if ext is not None:
wid = ext.waveformID()
for sr, sg in segGroups.items():
if req._disconnected:
return False
if byteCount == 0:
utils.writeTS(req, header)
byteCount += len(header)
data = ''
else:
data = ']},'
data += nslc.format(wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode())
data += sampleRate.format(sr)
if ro.showLatestUpdate:
data += updated.format(
self._formatTime(sg.updated))
utils.writeTS(req, data)
byteCount += len(data)
byteCount += writeSegments(sg.segments)
ext = line[0]
segGroups = OrderedDict()
segGroups[s.sampleRate()] = SegGroup([s], s.updated())
else:
if s.sampleRate() in segGroups:
segGroup = segGroups[s.sampleRate()]
if s.updated() > segGroup.updated:
segGroup.updated = s.updated()
segGroup.segments.append(s)
else:
segGroups[s.sampleRate()] = SegGroup([s], s.updated())
# handle last extent
if ext is not None:
wid = ext.waveformID()
for sr, sg in segGroups.items():
if req._disconnected:
return False
if byteCount == 0:
utils.writeTS(req, header)
byteCount += len(header)
data = ''
else:
data = ']},'
data += nslc.format(wid.networkCode(), wid.stationCode(),
wid.locationCode(), wid.channelCode())
data += sampleRate.format(sr)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(sg.updated))
utils.writeTS(req, data)
byteCount += len(data)
byteCount += writeSegments(sg.segments)
data = ']}'
utils.writeTS(req, data)
byteCount += len(data)
utils.writeTS(req, footer)
byteCount += len(footer)
# merge of sample rate: all timespans of one stream are grouped by
# quality
elif ro.mergeSampleRate:
segGroups = None
for line in lines:
s = line[1]
lineCount += 1
if line[0] is not ext:
if ext is not None:
wid = ext.waveformID()
for q, sg in segGroups.items():
if req._disconnected:
return False
if byteCount == 0:
utils.writeTS(req, header)
byteCount += len(header)
data = ''
else:
data = ']},'
data += nslc.format(wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode())
data += quality.format(q)
if ro.showLatestUpdate:
data += updated.format(
self._formatTime(sg.updated))
utils.writeTS(req, data)
byteCount += len(data)
byteCount += writeSegments(sg.segments)
ext = line[0]
segGroups = OrderedDict()
segGroups[s.quality()] = SegGroup([s], s.updated())
else:
if s.quality() in segGroups:
segGroup = segGroups[s.quality()]
if s.updated() > segGroup.updated:
segGroup.updated = s.updated()
segGroup.segments.append(s)
else:
segGroups[s.quality()] = SegGroup([s], s.updated())
# handle last extent
if ext is not None:
wid = ext.waveformID()
for q, sg in segGroups.items():
if req._disconnected:
return False
if byteCount == 0:
utils.writeTS(req, header)
byteCount += len(header)
data = ''
else:
data = ']},'
data += nslc.format(wid.networkCode(), wid.stationCode(),
wid.locationCode(), wid.channelCode())
data += quality.format(q)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(sg.updated))
utils.writeTS(req, data)
byteCount += len(data)
byteCount += writeSegments(sg.segments)
data = ']}'
utils.writeTS(req, data)
byteCount += len(data)
utils.writeTS(req, footer)
byteCount += len(footer)
# no merge: all timespans of one stream are grouped by tuple of
# quality and sampleRate
else:
segGroups = None
for line in lines:
s = line[1]
lineCount += 1
if line[0] is not ext:
if ext is not None:
wid = ext.waveformID()
for (q, sr), sg in segGroups.items():
if req._disconnected:
return False
if byteCount == 0:
utils.writeTS(req, header)
byteCount += len(header)
data = ''
else:
data = ']},'
data += nslc.format(wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode())
data += quality.format(q)
data += sampleRate.format(sr)
if ro.showLatestUpdate:
data += updated.format(
self._formatTime(sg.updated))
utils.writeTS(req, data)
byteCount += len(data)
byteCount += writeSegments(sg.segments)
ext = line[0]
segGroups = OrderedDict()
segGroups[(s.quality(), s.sampleRate())] = \
SegGroup([s], s.updated())
else:
t = (s.quality(), s.sampleRate())
if t in segGroups:
segGroup = segGroups[t]
if s.updated() > segGroup.updated:
segGroup.updated = s.updated()
segGroup.segments.append(s)
else:
segGroups[t] = SegGroup([s], s.updated())
# handle last extent
if ext is not None:
wid = ext.waveformID()
for (q, sr), sg in segGroups.items():
if req._disconnected:
return False
if byteCount == 0:
utils.writeTS(req, header)
byteCount += len(header)
data = ''
else:
data = ']},'
data += nslc.format(wid.networkCode(), wid.stationCode(),
wid.locationCode(), wid.channelCode())
data += quality.format(q)
data += sampleRate.format(sr)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(sg.updated))
utils.writeTS(req, data)
byteCount += len(data)
byteCount += writeSegments(sg.segments)
data = ']}'
utils.writeTS(req, data)
byteCount += len(data)
utils.writeTS(req, footer)
byteCount += len(footer)
return byteCount, lineCount
#--------------------------------------------------------------------------
def _processRequest(self, req, ro, dac):
if req._disconnected:
return False
# tuples: wid, segment, restricted status
lines = []
byteCount = 0
# iterate extents and create IN clauses of parent_oids in bunches
# of 1000 because the query size is limited
parentOIDs, idList, tooLarge = [], [], []
i = 0
for ext, objID, _ in ro.extentIter(dac, self.user, self.access):
if req._disconnected:
return False
if ro.excludeTooLarge:
if ext.segmentOverflow():
continue
elif ext.segmentOverflow():
tooLarge.append(ext)
continue
elif tooLarge:
continue
if i < 1000:
idList.append(objID)
i += 1
else:
parentOIDs.append(idList)
idList = [objID]
i = 1
if not ro.excludeTooLarge and tooLarge:
extents = ', '.join('{0}.{1}.{2}.{3}'.format(
e.waveformID().networkCode(),
e.waveformID().stationCode(),
e.waveformID().locationCode(),
e.waveformID().channelCode()) for e in tooLarge)
msg = 'Unable to process request due to database limitations. ' \
'Some selections have too many segments to process. ' \
'Rejected extents: {{{0}}}. This limitation may be ' \
'resolved in a future version of this webservice.' \
.format(extents)
self.writeErrorPage(req, http.REQUEST_ENTITY_TOO_LARGE, msg, ro)
return False
if len(idList) > 0:
parentOIDs.append(idList)
else:
msg = "no matching availabilty information found"
self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
return False
db = io.DatabaseInterface.Open(Application.Instance().databaseURI())
if db is None:
msg = "could not connect to database"
return self.renderErrorPage(req, http.SERVICE_UNAVAILABLE, msg, ro)
lines = self._lineIter(db, parentOIDs, req, ro, dac.extentsOID())
byteCount, segCount = self._writeLines(req, lines, ro)
# Return 204 if no matching availability information was found
if segCount <= 0:
msg = "no matching availabilty information found"
self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
return True
logging.debug("%s: returned %i segments (total bytes: %i)" % (
ro.service, segCount, byteCount))
utils.accessLog(req, ro, http.OK, byteCount, None)
return True
#--------------------------------------------------------------------------
@staticmethod
def _lineIter(db, parentOIDs, req, ro, oIDs):
def _T(name):
return db.convertColumnName(name)
dba = datamodel.DatabaseArchive(db)
for idList in parentOIDs:
if req._disconnected:
return
# build SQL query
q = 'SELECT * from DataSegment ' \
'WHERE _parent_oid IN ({0}) ' \
.format(','.join(str(x) for x in idList))
if ro.time:
if ro.time.start is not None:
if ro.time.start.microseconds() == 0:
q += "AND {0} >= '{1}' " \
.format(_T('end'), db.timeToString(ro.time.start))
else:
q += "AND ({0} > '{1}' OR (" \
"{0} = '{1}' AND end_ms >= {2})) " \
.format(_T('end'), db.timeToString(ro.time.start),
ro.time.start.microseconds())
if ro.time.end is not None:
if ro.time.end.microseconds() == 0:
q += "AND {0} < '{1}' " \
.format(_T('start'), db.timeToString(ro.time.end))
else:
q += "AND ({0} < '{1}' OR (" \
"{0} = '{1}' AND start_ms < {2})) " \
.format(_T('start'), db.timeToString(ro.time.end),
ro.time.end.microseconds())
if ro.quality:
q += "AND {0} IN ('{1}') ".format(_T('quality'),
"', '".join(ro.quality))
q += 'ORDER BY _parent_oid, {0}, {1}' \
.format(_T('start'), _T('start_ms'))
segIt = dba.getObjectIterator(q, datamodel.DataSegment.TypeInfo())
if segIt is None or not segIt.valid():
return
# Iterate and optionally merge segments.
# A segment will be yielded if
# - the extent changed
# - quality changed and merging of quality was not requested
# - sample rate changed and merging of sample rate was not
# requested
# - an overlap was detected and merging of overlaps was not
# requested
# - an gap was detected and the gaps exceeds the mergeGaps
# threshold
seg = None
ext = None
lines = 0
while not req._disconnected and (seg is None or segIt.next()) and \
(not ro.limit or lines < ro.limit):
s = datamodel.DataSegment.Cast(segIt.get())
if s is None:
break
# get extent for current segment
try:
e, restricted = oIDs[segIt.parentOid()]
except KeyError:
logging.warning("parent object id not found: %i",
segIt.parentOid())
continue
# first segment, no merge test required
if seg is None:
seg = s
ext = e
jitter = 1 / (2 * s.sampleRate())
continue
# merge test
diff = float(s.start() - seg.end())
if e is ext and \
(ro.mergeQuality or s.quality() == seg.quality()) and \
(ro.mergeSampleRate or
s.sampleRate() == seg.sampleRate()) and \
((ro.mergeGaps is None and diff <= jitter) or \
(ro.mergeGaps is not None and diff <= ro.mergeGaps)) and \
(-diff <= jitter or ro.mergeOverlap):
seg.setEnd(s.end())
if s.updated() > seg.updated():
seg.setUpdated(s.updated())
# merge was not possible, yield previous segment
else:
yield (ext, seg, restricted)
lines += 1
seg = s
ext = e
if seg.sampleRate() != s.sampleRate():
jitter = 1 / (2 * s.sampleRate())
if seg is not None and (not ro.limit or lines < ro.limit):
yield (ext, seg, restricted)
# close database iterator if iteration was stopped because of
# row limit
return
# vim: ts=4 et tw=79