############################################################################### # Copyright (C) 2013-2014 by gempa GmbH # # FDSNStation -- Implements the fdsnws-availability Web service, see # http://www.fdsn.org/webservices/ # # Feature notes: # - additional request parameters: # - excludetoolarge: boolean, default: true # # Author: Stephan Herrnkind # Email: herrnkind@gempa.de ############################################################################### from __future__ import absolute_import, division, print_function from functools import cmp_to_key from collections import OrderedDict from twisted.cred import portal from twisted.internet.threads import deferToThread from twisted.web import http, resource, server from zope.interface import implementer from seiscomp import datamodel, io, logging from seiscomp.client import Application from seiscomp.core import Time from .http import BaseResource from .request import RequestOptions from . import utils DBMaxUInt = 18446744073709551615 # 2^64 - 1 VERSION = "1.0.0" ############################################################################### class _AvailabilityRequestOptions(RequestOptions): # merge options VMergeSampleRate = 'samplerate' VMergeQuality = 'quality' VMergeOverlap = 'overlap' VMerge = [VMergeSampleRate, VMergeQuality] # overlap option only available # in query method # orderby options VOrderByNSLC = 'nslc_time_quality_samplerate' VOrderByCount = 'timespancount' VOrderByCountDesc = 'timespancount_desc' VOrderByUpdate = 'latestupdate' VOrderByUpdateDesc = 'latestupdate_desc' VOrderBy = [ VOrderByNSLC, VOrderByCount, VOrderByCountDesc, VOrderByUpdate, VOrderByUpdateDesc ] # format options VFormatText = 'text' VFormatGeoCSV = 'geocsv' VFormatJSON = 'json' VFormatRequest = 'request' OutputFormats = [VFormatText, VFormatGeoCSV, VFormatJSON, VFormatRequest] # request parameters PQuality = ['quality'] PMerge = ['merge'] POrderBy = ['orderby'] PLimit = ['limit'] PIncludeRestricted = ['includerestricted'] POSTParams = RequestOptions.POSTParams + PQuality + PMerge + POrderBy + \ PLimit + PIncludeRestricted GETParams = RequestOptions.GETParams + POSTParams #-------------------------------------------------------------------------- def __init__(self): RequestOptions.__init__(self) self.service = 'availability-base' self.quality = None self.mergeSampleRate = None self.mergeQuality = None self.mergeOverlap = None self.orderBy = self.VOrderBy[0] self.limit = None self.includeRestricted = None self.showLatestUpdate = None #-------------------------------------------------------------------------- def parse(self): self.parseTime() self.parseChannel() self.parseOutput() # quality: D, M, Q, R, * (optional) for v in self.getListValues(self.PQuality): if len(v) == 1: if v[0] == '*': self.quality = None break if v[0].isupper(): if self.quality is None: self.quality = [v] else: self.quality.append(v) continue self.raiseValueError(self.PQuality[0]) # merge (optional) for v in self.getListValues(self.PMerge, True): if v not in self.VMerge: self.raiseValueError(self.PMerge[0]) if v == self.VMergeSampleRate: self.mergeSampleRate = True elif v == self.VMergeQuality: self.mergeQuality = True elif v == self.VMergeOverlap: self.mergeOverlap = True # orderby (optional) key, value = self.getFirstValue(self.POrderBy) if value is not None: if value in self.VOrderBy: self.orderBy = value else: self.raiseValueError(key) # limit (optional) self.limit = self.parseInt(self.PLimit, 1, DBMaxUInt) # includeRestricted (optional) self.includeRestricted = self.parseBool(self.PIncludeRestricted) #-------------------------------------------------------------------------- def extentIter(self, dac, user=None, access=None): # tupel: extent, oid, restricted for e in dac.extentsSorted(): ext = e[0] restricted = e[2] wid = ext.waveformID() net = wid.networkCode() sta = wid.stationCode() loc = wid.locationCode() cha = wid.channelCode() if restricted: if not user: continue if access: startTime = ext.start() if ro.time.start() > startTime: startTime = ro.time.start() endTime = ext.end() if ro.time.end() < ext.end(): endTime = ro.time.end() if not access.authorize(user, net, sta, loc, cha, startTime, endTime): continue for ro in self.streams: if ro.channel: if not ro.channel.matchNet(net) or \ not ro.channel.matchSta(sta) or \ not ro.channel.matchLoc(loc) or \ not ro.channel.matchCha(cha): continue if ro.time and not ro.time.match(ext.start(), ext.end()): continue if not self.includeRestricted and restricted: continue yield e ############################################################################### class _AvailabilityExtentRequestOptions(_AvailabilityRequestOptions): #-------------------------------------------------------------------------- def __init__(self): _AvailabilityRequestOptions.__init__(self) self.service = 'availability-extent' self.showLatestUpdate = True #-------------------------------------------------------------------------- def attributeExtentIter(self, ext): for i in range(ext.dataAttributeExtentCount()): e = ext.dataAttributeExtent(i) if self.time and not self.time.match(e.start(), e.end()): continue if self.quality and e.quality() not in self.quality: continue yield e ############################################################################### class _AvailabilityQueryRequestOptions(_AvailabilityRequestOptions): # additional merge options VMerge = _AvailabilityRequestOptions.VMerge + \ [_AvailabilityRequestOptions.VMergeOverlap] # show options VShowLatestUpdate = 'latestupdate' VShow = [VShowLatestUpdate] # additional query specific request parameters PMergeGaps = ['mergegaps'] PShow = ['show'] PExcludeTooLarge = ['excludetoolarge'] POSTParams = _AvailabilityRequestOptions.POSTParams + PMergeGaps + \ PShow + PExcludeTooLarge GETParams = _AvailabilityRequestOptions.GETParams + PMergeGaps + \ PShow + PExcludeTooLarge #-------------------------------------------------------------------------- def __init__(self): _AvailabilityRequestOptions.__init__(self) self.service = 'availability-query' self.orderBy = None self.mergeGaps = None self.excludeTooLarge = None #-------------------------------------------------------------------------- def parse(self): _AvailabilityRequestOptions.parse(self) # merge gaps threshold (optional) self.mergeGaps = self.parseFloat(self.PMergeGaps, 0) # show (optional) for v in self.getListValues(self.PShow, True): if v not in self.VShow: self.raiseValueError(v) if v == self.VShowLatestUpdate: self.showLatestUpdate = True # exclude to large (optional) self.excludeTooLarge = self.parseBool(self.PExcludeTooLarge) if self.excludeTooLarge is None: self.excludeTooLarge = True if not self.channel: raise ValueError('Request contains no selections') if self.orderBy: raise ValueError("orderby not supported for query request") ############################################################################### class _Availability(BaseResource): isLeaf = True #-------------------------------------------------------------------------- def __init__(self, access=None, user=None): BaseResource.__init__(self, VERSION) self.access = access self.user = user #-------------------------------------------------------------------------- def render_OPTIONS(self, req): req.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') req.setHeader('Access-Control-Allow-Headers', 'Accept, Content-Type, X-Requested-With, Origin') req.setHeader('Content-Type', 'text/plain') return "" #-------------------------------------------------------------------------- def render_GET(self, req): # Parse and validate GET parameters ro = self._createRequestOptions() try: ro.parseGET(req.args) ro.parse() # the GET operation supports exactly one stream filter ro.streams.append(ro) except ValueError as e: logging.warning(str(e)) return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro) return self._prepareRequest(req, ro) #-------------------------------------------------------------------------- def render_POST(self, req): # Parse and validate POST parameters ro = self._createRequestOptions() try: ro.parsePOST(req.content) ro.parse() except ValueError as e: logging.warning(str(e)) return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro) return self._prepareRequest(req, ro) #-------------------------------------------------------------------------- def _prepareRequest(self, req, ro): dac = Application.Instance().getDACache() if ro.format == ro.VFormatJSON: contentType = 'application/json' extension = 'json' elif ro.format == ro.VFormatGeoCSV: contentType = 'text/csv' extension = 'csv' else: contentType = 'text/plain' extension = 'txt' req.setHeader('Content-Type', contentType) req.setHeader('Content-Disposition', 'inline; filename=fdsnws-ext-availability_{0}.{1}' .format(Time.GMT().iso(), extension)) d = deferToThread(self._processRequest, req, ro, dac) req.notifyFinish().addErrback(utils.onCancel, d) d.addBoth(utils.onFinish, req) # The request is handled by the deferred object return server.NOT_DONE_YET #-------------------------------------------------------------------------- @staticmethod def _formatTime(time, ms=False): if ms: return '{0}.{1:06d}Z'.format(time.toString('%FT%T'), time.microseconds()) return time.toString('%FT%TZ') #-------------------------------------------------------------------------- def _writeLines(self, req, lines, ro): if ro.format == ro.VFormatText: return self._writeFormatText(req, lines, ro) if ro.format == ro.VFormatGeoCSV: return self._writeFormatGeoCSV(req, lines, ro) if ro.format == ro.VFormatJSON: return self._writeFormatJSON(req, lines, ro) if ro.format == ro.VFormatRequest: return self._writeFormatRequest(req, lines, ro) raise Exception("unknown reponse format: %s" % ro.format) #-------------------------------------------------------------------------- def _writeFormatText(self, req, lines, ro): byteCount = 0 lineCount = 0 # the extent service uses a different update column name and alignment isExtentReq = ro.__class__ == _AvailabilityExtentRequestOptions nslc = '{0: <2} {1: <5} {2: <2} {3: <3}' quality = ' {0: <1}' sampleRate = ' {0: >11}' sampleRateF = ' {0: >11.1f}' time = ' {0: <27} {1: <27}' updated = ' {0: <20}' timeSpans = ' {0: >10}' restriction = ' {0: <11}' header = nslc.format('#N', 'S', 'L', 'C') if not ro.mergeQuality: header += quality.format('Q') if not ro.mergeSampleRate: header += sampleRate.format('SampleRate') header += time.format('Earliest', 'Latest') if ro.showLatestUpdate: header += updated.format('Updated') if isExtentReq: header += timeSpans.format('TimeSpans') header += restriction.format('Restriction') header += '\n' first = True for line in lines: if first: first = False utils.writeTS(req, header) byteCount += len(header) wid = line[0].waveformID() e = line[1] loc = wid.locationCode() if wid.locationCode() else "--" data = nslc.format(wid.networkCode(), wid.stationCode(), loc, wid.channelCode()) if not ro.mergeQuality: data += quality.format(e.quality()) if not ro.mergeSampleRate: data += sampleRateF.format(e.sampleRate()) data += time.format(self._formatTime(e.start(), True), self._formatTime(e.end(), True)) if ro.showLatestUpdate: data += updated.format(self._formatTime(e.updated())) if isExtentReq: data += timeSpans.format(e.segmentCount()) data += restriction.format('RESTRICTED' if line[2] else 'OPEN') data += '\n' utils.writeTS(req, data) byteCount += len(data) lineCount += 1 return byteCount, lineCount #-------------------------------------------------------------------------- def _writeFormatRequest(self, req, lines, ro): byteCount = 0 lineCount = 0 for line in lines: wid = line[0].waveformID() e = line[1] loc = wid.locationCode() if wid.locationCode() else "--" start = e.start() end = e.end() # truncate start and end time to requested time frame if ro.time: if ro.time.start and ro.time.start > start: start = ro.time.start if ro.time.end and ro.time.end < end: end = ro.time.end data = '{0} {1} {2} {3} {4} {5}\n'.format( wid.networkCode(), wid.stationCode(), loc, wid.channelCode(), self._formatTime(start, True), self._formatTime(end, True)) utils.writeTS(req, data) byteCount += len(data) lineCount += 1 return byteCount, lineCount #-------------------------------------------------------------------------- def _writeFormatGeoCSV(self, req, lines, ro): byteCount = 0 lineCount = 0 # the extent service uses a different update column name and alignment isExtentReq = ro.__class__ == _AvailabilityExtentRequestOptions nslc = '{0}|{1}|{2}|{3}' time = '|{0}|{1}' # header fieldUnit = '#field_unit: unitless|unitless|unitless|unitless' fieldType = '#field_type: string|string|string|string' fieldName = 'Network|Station|Location|Channel' if not ro.mergeQuality: fieldUnit += '|unitless' fieldType += '|string' fieldName += '|Quality' if not ro.mergeSampleRate: fieldUnit += ' |hertz' fieldType += ' |float' fieldName += '|SampleRate' fieldUnit += ' |ISO_8601|ISO_8601' fieldType += ' |datetime|datetime' fieldName += '|Earliest|Latest' if ro.showLatestUpdate: fieldUnit += '|ISO_8601' fieldType += '|datetime' fieldName += '|Updated' if isExtentReq: fieldUnit += '|unitless|unitless' fieldType += '|integer|string' fieldName += '|TimeSpans|Restriction' header = '#dataset: GeoCSV 2.0\n' \ '#delimiter: |\n' header += '{0}\n{1}\n{2}\n'.format(fieldUnit, fieldType, fieldName) first = True for line in lines: if first: first = False utils.writeTS(req, header) byteCount += len(header) wid = line[0].waveformID() e = line[1] data = nslc.format(wid.networkCode(), wid.stationCode(), wid.locationCode(), wid.channelCode()) if not ro.mergeQuality: data += '|' + e.quality() if not ro.mergeSampleRate: data += '|{0:.1f}'.format(e.sampleRate()) data += time.format(self._formatTime(e.start(), True), self._formatTime(e.end(), True)) if ro.showLatestUpdate: data += '|' + self._formatTime(e.updated()) if isExtentReq: data += '|{0:d}'.format(e.segmentCount()) data += '|RESTRICTED' if line[2] else '|OPEN' data += '\n' utils.writeTS(req, data) byteCount += len(data) lineCount += 1 return byteCount, lineCount #-------------------------------------------------------------------------- def _writeFormatJSON(self, req, lines, ro): header = '{{' \ '"created":"{0}",' \ '"version": 1.0,' \ '"datasources":['.format(self._formatTime(Time.GMT())) footer = ']}' return self._writeJSONChannels(req, header, footer, lines, ro) ############################################################################### @implementer(portal.IRealm) class FDSNAvailabilityExtentRealm(object): #-------------------------------------------------------------------------- def __init__(self, access): self.__access = access #-------------------------------------------------------------------------- def requestAvatar(self, avatarId, mind, *interfaces): if resource.IResource in interfaces: user = {"mail": avatarId, "blacklisted": False} return (resource.IResource, FDSNAvailabilityExtent(self.__access, user), lambda: None) raise NotImplementedError() ############################################################################### @implementer(portal.IRealm) class FDSNAvailabilityExtentAuthRealm(object): #-------------------------------------------------------------------------- def __init__(self, access, userdb): self.__access = access self.__userdb = userdb #-------------------------------------------------------------------------- def requestAvatar(self, avatarId, mind, *interfaces): if resource.IResource in interfaces: user = self.__userdb.getAttributes(avatarId) return (resource.IResource, FDSNAvailabilityExtent(self.__access, user), lambda: None) raise NotImplementedError() ############################################################################### class FDSNAvailabilityExtent(_Availability): isLeaf = True #-------------------------------------------------------------------------- def __init__(self, access=None, user=None): _Availability.__init__(self, access, user) #-------------------------------------------------------------------------- @staticmethod def _createRequestOptions(): return _AvailabilityExtentRequestOptions() #-------------------------------------------------------------------------- @staticmethod def _mergeExtents(attributeExtents): merged = None cloned = False # Create a copy of the extent only if more than 1 element is found. The # number of elements is not known in advance since attributeExtents # might only be an iterator. for e in attributeExtents: if merged is None: merged = e else: if not cloned: merged = datamodel.DataAttributeExtent(merged) cloned = True if e.start() < merged.start(): merged.setStart(e.start()) if e.end() > merged.end(): merged.setEnd(e.end()) if e.updated() > merged.updated(): merged.setUpdated(e.updated()) merged.setSegmentCount( merged.segmentCount() + e.segmentCount()) return merged #-------------------------------------------------------------------------- def _writeJSONChannels(self, req, header, footer, lines, ro): byteCount = 0 nslc = '{{' \ '"network":"{0}",' \ '"station":"{1}",' \ '"location":"{2}",' \ '"channel":"{3}"' quality = ',"quality":"{0}"' sampleRate = ',"samplerate":{0}' time = ',"earliest":"{0}","latest":"{1}","timespanCount":{2}' \ ',"updated":"{3}","restriction":"{4}"}}' utils.writeTS(req, header) byteCount += len(header) data = None for line in lines: wid = line[0].waveformID() e = line[1] data = "," if data is not None else "" data += nslc.format(wid.networkCode(), wid.stationCode(), wid.locationCode(), wid.channelCode()) if not ro.mergeQuality: data += quality.format(e.quality()) if not ro.mergeSampleRate: data += sampleRate.format(e.sampleRate()) data += time.format(self._formatTime(e.start(), True), self._formatTime(e.end(), True), e.segmentCount(), self._formatTime(e.updated()), format('RESTRICTED' if line[2] else 'OPEN')) utils.writeTS(req, data) byteCount += len(data) utils.writeTS(req, footer) byteCount += len(footer) return byteCount, len(lines) #-------------------------------------------------------------------------- def _processRequest(self, req, ro, dac): if req._disconnected: return False # tuples: wid, attribute extent, restricted status lines = [] mergeAll = ro.mergeQuality and ro.mergeSampleRate mergeNone = not ro.mergeQuality and not ro.mergeSampleRate # iterate extents for ext, _, restricted in ro.extentIter(dac, self.user, self.access): if req._disconnected: return False # iterate attribute extents and merge them if requested if mergeNone: for e in ro.attributeExtentIter(ext): lines.append((ext, e, restricted)) elif mergeAll: e = self._mergeExtents(ro.attributeExtentIter(ext)) if e is not None: lines.append((ext, e, restricted)) elif ro.mergeQuality: eDict = {} # key=sampleRate for e in ro.attributeExtentIter(ext): if e.sampleRate() in eDict: eDict[e.sampleRate()].append(e) else: eDict[e.sampleRate()] = [e] for k in eDict: e = self._mergeExtents(eDict[k]) lines.append((ext, e, restricted)) else: eDict = {} # key=quality for e in ro.attributeExtentIter(ext): if e.quality() in eDict: eDict[e.quality()].append(e) else: eDict[e.quality()] = [e] for k in eDict: e = self._mergeExtents(eDict[k]) lines.append((ext, e, restricted)) # Return 204 if no matching availability information was found if len(lines) == 0: msg = "no matching availabilty information found" self.writeErrorPage(req, http.NO_CONTENT, msg, ro) return True # sort lines self._sortLines(lines, ro) # truncate lines to requested row limit if ro.limit: del lines[ro.limit:] byteCount, extCount = self._writeLines(req, lines, ro) logging.debug("%s: returned %i extents (total bytes: %i)" % ( ro.service, extCount, byteCount)) utils.accessLog(req, ro, http.OK, byteCount, None) return True #-------------------------------------------------------------------------- @staticmethod def _sortLines(lines, ro): def compareNSLC(l1, l2): if l1[0] is not l2[0]: # The lines are expected to be sorted according NSLC return 0 e1 = l1[1] e2 = l2[1] if e1.start() < e2.start(): return -1 if e1.start() > e2.start(): return 1 if e1.end() < e2.end(): return -1 if e1.end() > e2.end(): return 1 if not ro.mergeQuality: if e1.quality() < e2.quality(): return -1 if e1.quality() > e2.quality(): return 1 if not ro.mergeSampleRate: if e1.sampleRate() < e2.sampleRate(): return -1 if e1.sampleRate() > e2.sampleRate(): return 1 return 0 def compareCount(l1, l2): c1 = l1[1].segmentCount() c2 = l2[1].segmentCount() return -1 if c1 < c2 else \ 1 if c1 > c2 else \ compareNSLC(l1, l2) def compareCountDesc(l1, l2): c1 = l1[1].segmentCount() c2 = l2[1].segmentCount() return -1 if c1 > c2 else \ 1 if c1 < c2 else \ compareNSLC(l1, l2) def compareUpdate(l1, l2): c1 = l1[1].updated() c2 = l2[1].updated() return -1 if c1 < c2 else \ 1 if c1 > c2 else \ compareNSLC(l1, l2) def compareUpdateDesc(l1, l2): c1 = l1[1].updated() c2 = l2[1].updated() return -1 if c1 > c2 else \ 1 if c1 < c2 else \ compareNSLC(l1, l2) comparator = compareNSLC if ro.orderBy == ro.VOrderByNSLC else \ compareCount if ro.orderBy == ro.VOrderByCount else \ compareCountDesc if ro.orderBy == ro.VOrderByCountDesc else \ compareUpdate if ro.orderBy == ro.VOrderByUpdate else \ compareUpdateDesc lines.sort(key=cmp_to_key(comparator)) ############################################################################### @implementer(portal.IRealm) class FDSNAvailabilityQueryRealm(object): #-------------------------------------------------------------------------- def __init__(self, access): self.__access = access #-------------------------------------------------------------------------- def requestAvatar(self, avatarId, mind, *interfaces): if resource.IResource in interfaces: user = {"mail": avatarId, "blacklisted": False} return (resource.IResource, FDSNAvailabilityQuery(self.__access, user), lambda: None) raise NotImplementedError() ############################################################################### @implementer(portal.IRealm) class FDSNAvailabilityQueryAuthRealm(object): #-------------------------------------------------------------------------- def __init__(self, access, userdb): self.__access = access self.__userdb = userdb #-------------------------------------------------------------------------- def requestAvatar(self, avatarId, mind, *interfaces): if resource.IResource in interfaces: user = self.__userdb.getAttributes(avatarId) return (resource.IResource, FDSNAvailabilityQuery(self.__access, user), lambda: None) raise NotImplementedError() ############################################################################### class FDSNAvailabilityQuery(_Availability): isLeaf = True #-------------------------------------------------------------------------- def __init__(self, access=None, user=None): _Availability.__init__(self, access, user) #-------------------------------------------------------------------------- @staticmethod def _createRequestOptions(): return _AvailabilityQueryRequestOptions() #-------------------------------------------------------------------------- def _writeJSONChannels(self, req, header, footer, lines, ro): byteCount = 0 lineCount = 0 nslc = '{{' \ '"network":"{0}",' \ '"station":"{1}",' \ '"location":"{2}",' \ '"channel":"{3}"' quality = ',"quality":"{0}"' sampleRate = ',"samplerate":{0}' updated = ',"updated":"{0}"' timeSpans = ',"timespans":[[' seg = '"{0}","{1}"]' class SegGroup: def __init__(self, segments, updated): self.segments = segments self.updated = updated def writeSegments(segments): first = True byteCount = 0 for s in segments: if first: first = False data = timeSpans else: data = ",[" data += seg.format( self._formatTime(s.start(), True), self._formatTime(s.end(), True)) utils.writeTS(req, data) byteCount += len(data) return byteCount ext = None # merge of quality and sample rate: all timespans of one stream are # grouped into one channel if ro.mergeQuality and ro.mergeSampleRate: lastUpdate = None segments = None for line in lines: s = line[1] lineCount += 1 if line[0] is not ext: if ext is not None: if byteCount == 0: utils.writeTS(req, header) byteCount += len(header) data = '' else: data = ']},' wid = ext.waveformID() data += nslc.format(wid.networkCode(), wid.stationCode(), wid.locationCode(), wid.channelCode()) if ro.showLatestUpdate: data += updated.format(self._formatTime(lastUpdate)) utils.writeTS(req, data) byteCount += len(data) byteCount += writeSegments(segments) ext = line[0] lastUpdate = s.updated() segments = [s] else: if s.updated() > lastUpdate: lastUpdate = s.updated() segments.append(s) # handle last extent if ext is not None: if byteCount == 0: utils.writeTS(req, header) byteCount += len(header) data = '' else: data = ']},' wid = ext.waveformID() data += nslc.format(wid.networkCode(), wid.stationCode(), wid.locationCode(), wid.channelCode()) if ro.showLatestUpdate: data += updated.format(self._formatTime(lastUpdate)) utils.writeTS(req, data) byteCount += len(data) byteCount += writeSegments(segments) data = ']}' utils.writeTS(req, data) byteCount += len(data) utils.writeTS(req, footer) byteCount += len(footer) # merge of quality: all timespans of one stream are grouped by # sample rate elif ro.mergeQuality: segGroups = None for line in lines: s = line[1] lineCount += 1 if line[0] is not ext: if ext is not None: wid = ext.waveformID() for sr, sg in segGroups.items(): if req._disconnected: return False if byteCount == 0: utils.writeTS(req, header) byteCount += len(header) data = '' else: data = ']},' data += nslc.format(wid.networkCode(), wid.stationCode(), wid.locationCode(), wid.channelCode()) data += sampleRate.format(sr) if ro.showLatestUpdate: data += updated.format( self._formatTime(sg.updated)) utils.writeTS(req, data) byteCount += len(data) byteCount += writeSegments(sg.segments) ext = line[0] segGroups = OrderedDict() segGroups[s.sampleRate()] = SegGroup([s], s.updated()) else: if s.sampleRate() in segGroups: segGroup = segGroups[s.sampleRate()] if s.updated() > segGroup.updated: segGroup.updated = s.updated() segGroup.segments.append(s) else: segGroups[s.sampleRate()] = SegGroup([s], s.updated()) # handle last extent if ext is not None: wid = ext.waveformID() for sr, sg in segGroups.items(): if req._disconnected: return False if byteCount == 0: utils.writeTS(req, header) byteCount += len(header) data = '' else: data = ']},' data += nslc.format(wid.networkCode(), wid.stationCode(), wid.locationCode(), wid.channelCode()) data += sampleRate.format(sr) if ro.showLatestUpdate: data += updated.format(self._formatTime(sg.updated)) utils.writeTS(req, data) byteCount += len(data) byteCount += writeSegments(sg.segments) data = ']}' utils.writeTS(req, data) byteCount += len(data) utils.writeTS(req, footer) byteCount += len(footer) # merge of sample rate: all timespans of one stream are grouped by # quality elif ro.mergeSampleRate: segGroups = None for line in lines: s = line[1] lineCount += 1 if line[0] is not ext: if ext is not None: wid = ext.waveformID() for q, sg in segGroups.items(): if req._disconnected: return False if byteCount == 0: utils.writeTS(req, header) byteCount += len(header) data = '' else: data = ']},' data += nslc.format(wid.networkCode(), wid.stationCode(), wid.locationCode(), wid.channelCode()) data += quality.format(q) if ro.showLatestUpdate: data += updated.format( self._formatTime(sg.updated)) utils.writeTS(req, data) byteCount += len(data) byteCount += writeSegments(sg.segments) ext = line[0] segGroups = OrderedDict() segGroups[s.quality()] = SegGroup([s], s.updated()) else: if s.quality() in segGroups: segGroup = segGroups[s.quality()] if s.updated() > segGroup.updated: segGroup.updated = s.updated() segGroup.segments.append(s) else: segGroups[s.quality()] = SegGroup([s], s.updated()) # handle last extent if ext is not None: wid = ext.waveformID() for q, sg in segGroups.items(): if req._disconnected: return False if byteCount == 0: utils.writeTS(req, header) byteCount += len(header) data = '' else: data = ']},' data += nslc.format(wid.networkCode(), wid.stationCode(), wid.locationCode(), wid.channelCode()) data += quality.format(q) if ro.showLatestUpdate: data += updated.format(self._formatTime(sg.updated)) utils.writeTS(req, data) byteCount += len(data) byteCount += writeSegments(sg.segments) data = ']}' utils.writeTS(req, data) byteCount += len(data) utils.writeTS(req, footer) byteCount += len(footer) # no merge: all timespans of one stream are grouped by tuple of # quality and sampleRate else: segGroups = None for line in lines: s = line[1] lineCount += 1 if line[0] is not ext: if ext is not None: wid = ext.waveformID() for (q, sr), sg in segGroups.items(): if req._disconnected: return False if byteCount == 0: utils.writeTS(req, header) byteCount += len(header) data = '' else: data = ']},' data += nslc.format(wid.networkCode(), wid.stationCode(), wid.locationCode(), wid.channelCode()) data += quality.format(q) data += sampleRate.format(sr) if ro.showLatestUpdate: data += updated.format( self._formatTime(sg.updated)) utils.writeTS(req, data) byteCount += len(data) byteCount += writeSegments(sg.segments) ext = line[0] segGroups = OrderedDict() segGroups[(s.quality(), s.sampleRate())] = \ SegGroup([s], s.updated()) else: t = (s.quality(), s.sampleRate()) if t in segGroups: segGroup = segGroups[t] if s.updated() > segGroup.updated: segGroup.updated = s.updated() segGroup.segments.append(s) else: segGroups[t] = SegGroup([s], s.updated()) # handle last extent if ext is not None: wid = ext.waveformID() for (q, sr), sg in segGroups.items(): if req._disconnected: return False if byteCount == 0: utils.writeTS(req, header) byteCount += len(header) data = '' else: data = ']},' data += nslc.format(wid.networkCode(), wid.stationCode(), wid.locationCode(), wid.channelCode()) data += quality.format(q) data += sampleRate.format(sr) if ro.showLatestUpdate: data += updated.format(self._formatTime(sg.updated)) utils.writeTS(req, data) byteCount += len(data) byteCount += writeSegments(sg.segments) data = ']}' utils.writeTS(req, data) byteCount += len(data) utils.writeTS(req, footer) byteCount += len(footer) return byteCount, lineCount #-------------------------------------------------------------------------- def _processRequest(self, req, ro, dac): if req._disconnected: return False # tuples: wid, segment, restricted status lines = [] byteCount = 0 # iterate extents and create IN clauses of parent_oids in bunches # of 1000 because the query size is limited parentOIDs, idList, tooLarge = [], [], [] i = 0 for ext, objID, _ in ro.extentIter(dac, self.user, self.access): if req._disconnected: return False if ro.excludeTooLarge: if ext.segmentOverflow(): continue elif ext.segmentOverflow(): tooLarge.append(ext) continue elif tooLarge: continue if i < 1000: idList.append(objID) i += 1 else: parentOIDs.append(idList) idList = [objID] i = 1 if not ro.excludeTooLarge and tooLarge: extents = ', '.join('{0}.{1}.{2}.{3}'.format( e.waveformID().networkCode(), e.waveformID().stationCode(), e.waveformID().locationCode(), e.waveformID().channelCode()) for e in tooLarge) msg = 'Unable to process request due to database limitations. ' \ 'Some selections have too many segments to process. ' \ 'Rejected extents: {{{0}}}. This limitation may be ' \ 'resolved in a future version of this webservice.' \ .format(extents) self.writeErrorPage(req, http.REQUEST_ENTITY_TOO_LARGE, msg, ro) return False if len(idList) > 0: parentOIDs.append(idList) else: msg = "no matching availabilty information found" self.writeErrorPage(req, http.NO_CONTENT, msg, ro) return False db = io.DatabaseInterface.Open(Application.Instance().databaseURI()) if db is None: msg = "could not connect to database" return self.renderErrorPage(req, http.SERVICE_UNAVAILABLE, msg, ro) lines = self._lineIter(db, parentOIDs, req, ro, dac.extentsOID()) byteCount, segCount = self._writeLines(req, lines, ro) # Return 204 if no matching availability information was found if segCount <= 0: msg = "no matching availabilty information found" self.writeErrorPage(req, http.NO_CONTENT, msg, ro) return True logging.debug("%s: returned %i segments (total bytes: %i)" % ( ro.service, segCount, byteCount)) utils.accessLog(req, ro, http.OK, byteCount, None) return True #-------------------------------------------------------------------------- @staticmethod def _lineIter(db, parentOIDs, req, ro, oIDs): def _T(name): return db.convertColumnName(name) dba = datamodel.DatabaseArchive(db) for idList in parentOIDs: if req._disconnected: return # build SQL query q = 'SELECT * from DataSegment ' \ 'WHERE _parent_oid IN ({0}) ' \ .format(','.join(str(x) for x in idList)) if ro.time: if ro.time.start is not None: if ro.time.start.microseconds() == 0: q += "AND {0} >= '{1}' " \ .format(_T('end'), db.timeToString(ro.time.start)) else: q += "AND ({0} > '{1}' OR (" \ "{0} = '{1}' AND end_ms >= {2})) " \ .format(_T('end'), db.timeToString(ro.time.start), ro.time.start.microseconds()) if ro.time.end is not None: if ro.time.end.microseconds() == 0: q += "AND {0} < '{1}' " \ .format(_T('start'), db.timeToString(ro.time.end)) else: q += "AND ({0} < '{1}' OR (" \ "{0} = '{1}' AND start_ms < {2})) " \ .format(_T('start'), db.timeToString(ro.time.end), ro.time.end.microseconds()) if ro.quality: q += "AND {0} IN ('{1}') ".format(_T('quality'), "', '".join(ro.quality)) q += 'ORDER BY _parent_oid, {0}, {1}' \ .format(_T('start'), _T('start_ms')) segIt = dba.getObjectIterator(q, datamodel.DataSegment.TypeInfo()) if segIt is None or not segIt.valid(): return # Iterate and optionally merge segments. # A segment will be yielded if # - the extent changed # - quality changed and merging of quality was not requested # - sample rate changed and merging of sample rate was not # requested # - an overlap was detected and merging of overlaps was not # requested # - an gap was detected and the gaps exceeds the mergeGaps # threshold seg = None ext = None lines = 0 while not req._disconnected and (seg is None or segIt.next()) and \ (not ro.limit or lines < ro.limit): s = datamodel.DataSegment.Cast(segIt.get()) if s is None: break # get extent for current segment try: e, restricted = oIDs[segIt.parentOid()] except KeyError: logging.warning("parent object id not found: %i", segIt.parentOid()) continue # first segment, no merge test required if seg is None: seg = s ext = e jitter = 1 / (2 * s.sampleRate()) continue # merge test diff = float(s.start() - seg.end()) if e is ext and \ (ro.mergeQuality or s.quality() == seg.quality()) and \ (ro.mergeSampleRate or s.sampleRate() == seg.sampleRate()) and \ ((ro.mergeGaps is None and diff <= jitter) or \ (ro.mergeGaps is not None and diff <= ro.mergeGaps)) and \ (-diff <= jitter or ro.mergeOverlap): seg.setEnd(s.end()) if s.updated() > seg.updated(): seg.setUpdated(s.updated()) # merge was not possible, yield previous segment else: yield (ext, seg, restricted) lines += 1 seg = s ext = e if seg.sampleRate() != s.sampleRate(): jitter = 1 / (2 * s.sampleRate()) if seg is not None and (not ro.limit or lines < ro.limit): yield (ext, seg, restricted) # close database iterator if iteration was stopped because of # row limit return # vim: ts=4 et tw=79