################################################################################ # Copyright (C) 2013-2014 gempa GmbH # # FDSNStation -- Implements the fdsnws-station Web service, see # http://www.fdsn.org/webservices/ # # Feature notes: # - 'updatedafter' request parameter not implemented: The last modification # time in SeisComP is tracked on the object level. If a child of an object # is updated the update time is not propagated to all parents. In order to # check if a station was updated all children must be evaluated recursively. # This operation would be much to expensive. # - additional request parameters: # - formatted: boolean, default: false # - additional values of request parameters: # - format # - standard: [xml, text] # - additional: [fdsnxml (=xml), stationxml, sc3ml] # - default: xml # # Author: Stephan Herrnkind # Email: herrnkind@gempa.de ################################################################################ from __future__ import absolute_import, division, print_function from twisted.internet.threads import deferToThread from twisted.web import http, server import seiscomp.datamodel import seiscomp.logging from seiscomp.client import Application from seiscomp.core import Time from seiscomp.io import Exporter, ExportObjectList from .http import BaseResource from .request import RequestOptions from . import utils VERSION = "1.1.4" ################################################################################ class _StationRequestOptions(RequestOptions): Exporters = {'xml': 'fdsnxml', 'fdsnxml': 'fdsnxml', 'stationxml': 'staxml', 'sc3ml': 'trunk'} MinTime = Time(0, 1) VText = ['text'] #OutputFormats = list(Exporters) + VText #Default format must be the first, list(Exporters) has random order OutputFormats = ['xml', 'fdsnxml', 'stationxml', 'sc3ml'] + VText PLevel = ['level'] PIncludeRestricted = ['includerestricted'] PIncludeAvailability = ['includeavailability'] PUpdateAfter = ['updateafter'] PMatchTimeSeries = ['matchtimeseries'] # non standard parameters PFormatted = ['formatted'] POSTParams = RequestOptions.POSTParams + RequestOptions.GeoParams + \ PLevel + PIncludeRestricted + PIncludeAvailability + \ PUpdateAfter + PMatchTimeSeries + PFormatted GETParams = RequestOptions.GETParams + RequestOptions.WindowTimeParams + \ POSTParams #--------------------------------------------------------------------------- def __init__(self): RequestOptions.__init__(self) self.service = 'fdsnws-station' self.includeSta = True self.includeCha = False self.includeRes = False self.restricted = None self.availability = None self.updatedAfter = None self.matchTimeSeries = None # non standard parameters self.formatted = None #--------------------------------------------------------------------------- def parse(self): self.parseTime(True) self.parseChannel() self.parseGeo() self.parseOutput() # level: [network, station, channel, response] key, value = self.getFirstValue(self.PLevel) if value is not None: value = value.lower() if value in ('network', 'net'): self.includeSta = False elif value in ('channel', 'cha', 'chan'): self.includeCha = True elif value in ('response', 'res', 'resp'): self.includeCha = True self.includeRes = True elif value not in ('station', 'sta'): self.raiseValueError(key) # includeRestricted (optional) self.restricted = self.parseBool(self.PIncludeRestricted) # includeAvailability (optionalsc3ml) self.availability = self.parseBool(self.PIncludeAvailability) # updatedAfter (optional), currently not supported self.updatedAfter = self.parseTimeStr(self.PUpdateAfter) # includeAvailability (optional) self.matchTimeSeries = self.parseBool(self.PMatchTimeSeries) # format XML self.formatted = self.parseBool(self.PFormatted) #--------------------------------------------------------------------------- def networkIter(self, inv, matchTime=False): for i in range(inv.networkCount()): net = inv.network(i) for ro in self.streams: # network code if ro.channel and not ro.channel.matchNet(net.code()): continue # start and end time if matchTime and ro.time: try: end = net.end() except ValueError: end = None if not ro.time.match(net.start(), end): continue yield net break #--------------------------------------------------------------------------- def stationIter(self, net, matchTime=False): for i in range(net.stationCount()): sta = net.station(i) # geographic location if self.geo: try: lat = sta.latitude() lon = sta.longitude() except ValueError: continue if not self.geo.match(lat, lon): continue for ro in self.streams: # station code if ro.channel and (not ro.channel.matchSta(sta.code()) or not ro.channel.matchNet(net.code())): continue # start and end time if matchTime and ro.time: try: end = sta.end() except ValueError: end = None if not ro.time.match(sta.start(), end): continue yield sta break #--------------------------------------------------------------------------- def locationIter(self, net, sta, matchTime=False): for i in range(sta.sensorLocationCount()): loc = sta.sensorLocation(i) for ro in self.streams: # location code if ro.channel and (not ro.channel.matchLoc(loc.code()) or not ro.channel.matchSta(sta.code()) or not ro.channel.matchNet(net.code())): continue # start and end time if matchTime and ro.time: try: end = loc.end() except ValueError: end = None if not ro.time.match(loc.start(), end): continue yield loc break #--------------------------------------------------------------------------- def streamIter(self, net, sta, loc, matchTime, dac): for i in range(loc.streamCount()): stream = loc.stream(i) for ro in self.streams: # stream code if ro.channel and (not ro.channel.matchCha(stream.code()) or not ro.channel.matchLoc(loc.code()) or not ro.channel.matchSta(sta.code()) or not ro.channel.matchNet(net.code())): continue # start and end time if matchTime and ro.time: try: end = stream.end() except ValueError: end = None if not ro.time.match(stream.start(), end): continue # match data availability extent if dac is not None and self.matchTimeSeries: extent = dac.extent(net.code(), sta.code(), loc.code(), stream.code()) if extent is None or (ro.time and not ro.time.match(extent.start(), extent.end())): continue yield stream break ################################################################################ class FDSNStation(BaseResource): isLeaf = True #--------------------------------------------------------------------------- def __init__(self, inv, restricted, maxObj, daEnabled, conditionalRequestsEnabled, timeInventoryLoaded): BaseResource.__init__(self, VERSION) self._inv = inv self._allowRestricted = restricted self._maxObj = maxObj self._daEnabled = daEnabled self._conditionalRequestsEnabled = conditionalRequestsEnabled self._timeInventoryLoaded = timeInventoryLoaded.seconds() # additional object count dependent on detail level self._resLevelCount = inv.responsePAZCount() + inv.responseFIRCount() \ + inv.responsePolynomialCount() + inv.responseIIRCount() \ + inv.responseFAPCount() for i in range(inv.dataloggerCount()): self._resLevelCount += inv.datalogger(i).decimationCount() #--------------------------------------------------------------------------- def render_OPTIONS(self, req): req.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') req.setHeader('Access-Control-Allow-Headers', 'Accept, Content-Type, X-Requested-With, Origin') req.setHeader('Content-Type', 'text/plain') return "" #--------------------------------------------------------------------------- def render_GET(self, req): # Parse and validate GET parameters ro = _StationRequestOptions() try: ro.parseGET(req.args) ro.parse() # the GET operation supports exactly one stream filter ro.streams.append(ro) except ValueError as e: seiscomp.logging.warning(str(e)) return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro) return self._prepareRequest(req, ro) #--------------------------------------------------------------------------- def render_POST(self, req): # Parse and validate POST parameters ro = _StationRequestOptions() try: ro.parsePOST(req.content) ro.parse() except ValueError as e: seiscomp.logging.warning(str(e)) return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro) return self._prepareRequest(req, ro) #--------------------------------------------------------------------------- def _prepareRequest(self, req, ro): if ro.availability and not self._daEnabled: msg = "including of availability information not supported" return self.renderErrorPage(req, http.BAD_REQUEST, msg, ro) if ro.updatedAfter: msg = "filtering based on update time not supported" return self.renderErrorPage(req, http.BAD_REQUEST, msg, ro) if ro.matchTimeSeries and not self._daEnabled: msg = "filtering based on available time series not supported" return self.renderErrorPage(req, http.BAD_REQUEST, msg, ro) # load data availability if requested dac = None if ro.availability or ro.matchTimeSeries: dac = Application.Instance().getDACache() if dac is None or len(dac.extents()) == 0: msg = "no data availabiltiy extent information found" return self.renderErrorPage(req, http.NO_CONTENT, msg, ro) # Exporter, 'None' is used for text output if ro.format in ro.VText: if ro.includeRes: msg = "response level output not available in text format" return self.renderErrorPage(req, http.BAD_REQUEST, msg, ro) req.setHeader('Content-Type', 'text/plain') d = deferToThread(self._processRequestText, req, ro, dac) else: exp = Exporter.Create(ro.Exporters[ro.format]) if exp is None: msg = "output format '%s' no available, export module '%s' " \ "could not be loaded." % ( ro.format, ro.Exporters[ro.format]) return self.renderErrorPage(req, http.BAD_REQUEST, msg, ro) req.setHeader('Content-Type', 'application/xml') exp.setFormattedOutput(bool(ro.formatted)) d = deferToThread(self._processRequestExp, req, ro, exp, dac) req.notifyFinish().addErrback(utils.onCancel, d) d.addBoth(utils.onFinish, req) # The request is handled by the deferred object return server.NOT_DONE_YET #--------------------------------------------------------------------------- def _processRequestExp(self, req, ro, exp, dac): if req._disconnected: #pylint: disable=W0212 return False staCount, locCount, chaCount, extCount, objCount = 0, 0, 0, 0, 0 seiscomp.datamodel.PublicObject.SetRegistrationEnabled(False) newInv = seiscomp.datamodel.Inventory() dataloggers, sensors, extents = set(), set(), {} skipRestricted = not self._allowRestricted or \ (ro.restricted is not None and not ro.restricted) levelNet = not ro.includeSta levelSta = ro.includeSta and not ro.includeCha isConditionalRequest = self._isConditionalRequest(req) # iterate over inventory networks for net in ro.networkIter(self._inv, levelNet): if req._disconnected: #pylint: disable=W0212 return False if skipRestricted and utils.isRestricted(net): continue newNet = seiscomp.datamodel.Network(net) # Copy comments for i in range(net.commentCount()): newNet.add(seiscomp.datamodel.Comment(net.comment(i))) # iterate over inventory stations of current network for sta in ro.stationIter(net, levelSta): if req._disconnected: #pylint: disable=W0212 return False if skipRestricted and utils.isRestricted(sta): continue if not self.checkObjects(req, objCount, self._maxObj): return False if ro.includeCha: numCha, numLoc, d, s, e = \ self._processStation(newNet, net, sta, ro, dac, skipRestricted, isConditionalRequest) if numCha > 0: if isConditionalRequest: self.returnNotModified(req, ro) return True locCount += numLoc chaCount += numCha extCount += len(e) objCount += numLoc + numCha + extCount if not self.checkObjects(req, objCount, self._maxObj): return False dataloggers |= d sensors |= s for k, v in e.items(): if k not in extents: extents[k] = v elif self._matchStation(net, sta, ro, dac): if isConditionalRequest: self.returnNotModified(req, ro) return True if ro.includeSta: newSta = seiscomp.datamodel.Station(sta) # Copy comments for i in range(sta.commentCount()): newSta.add(seiscomp.datamodel.Comment(sta.comment(i))) newNet.add(newSta) else: # no station output requested: one matching station # is sufficient to include the network newInv.add(newNet) objCount += 1 break if newNet.stationCount() > 0: newInv.add(newNet) staCount += newNet.stationCount() objCount += staCount + 1 # Return 204 if no matching inventory was found if newInv.networkCount() == 0: msg = "no matching inventory found" self.writeErrorPage(req, http.NO_CONTENT, msg, ro) return True if self._conditionalRequestsEnabled: req.setHeader("Last-Modified", http.datetimeToString(self._timeInventoryLoaded)) # Copy references (dataloggers, responses, sensors) decCount, resCount = 0, 0 if ro.includeCha: decCount = self._copyReferences(newInv, req, objCount, self._inv, ro, dataloggers, sensors, self._maxObj) if decCount is None: return False resCount = newInv.responsePAZCount() + \ newInv.responseFIRCount() + \ newInv.responsePolynomialCount() + \ newInv.responseFAPCount() + \ newInv.responseIIRCount() objCount += resCount + decCount + newInv.dataloggerCount() + \ newInv.sensorCount() # Copy data extents objOut = newInv if len(extents) > 0: objCount += 1 da = seiscomp.datamodel.DataAvailability() for k, v in extents.items(): objCount += 1 da.add(seiscomp.datamodel.DataExtent(v)) objOut = ExportObjectList() objOut.append(newInv) objOut.append(da) sink = utils.Sink(req) if not exp.write(sink, objOut): return False seiscomp.logging.debug( "%s: returned %iNet, %iSta, %iLoc, %iCha, %iDL, %iDec, %iSen, " "%iRes, %iDAExt (total objects/bytes: %i/%i)" % ( ro.service, newInv.networkCount(), staCount, locCount, chaCount, newInv.dataloggerCount(), decCount, newInv.sensorCount(), resCount, extCount, objCount, sink.written)) utils.accessLog(req, ro, http.OK, sink.written, None) return True #--------------------------------------------------------------------------- @staticmethod def _formatEpoch(obj): df = "%FT%T" dfMS = "%FT%T.%f" if obj.start().microseconds() > 0: start = obj.start().toString(dfMS) else: start = obj.start().toString(df) try: if obj.end().microseconds() > 0: end = obj.end().toString(dfMS) else: end = obj.end().toString(df) except ValueError: end = '' return start, end #--------------------------------------------------------------------------- def _processRequestText(self, req, ro, dac): if req._disconnected: #pylint: disable=W0212 return False skipRestricted = not self._allowRestricted or \ (ro.restricted is not None and not ro.restricted) isConditionalRequest = self._isConditionalRequest(req) data = "" lines = [] # level = network if not ro.includeSta: data = "#Network|Description|StartTime|EndTime|TotalStations\n" # iterate over inventory networks for net in ro.networkIter(self._inv, True): if req._disconnected: #pylint: disable=W0212 return False if skipRestricted and utils.isRestricted(net): continue # at least one matching station is required stationFound = False for sta in ro.stationIter(net, False): if req._disconnected: #pylint: disable=W0212 return False if self._matchStation(net, sta, ro, dac) and \ not (skipRestricted and utils.isRestricted(sta)): stationFound = True break if not stationFound: continue if isConditionalRequest: self.returnNotModified(req, ro) return True start, end = self._formatEpoch(net) lines.append(("%s %s" % (net.code(), start), "%s|%s|%s|%s|%i\n" % ( net.code(), net.description(), start, end, net.stationCount()))) # level = station elif not ro.includeCha: data = "#Network|Station|Latitude|Longitude|Elevation|" \ "SiteName|StartTime|EndTime\n" # iterate over inventory networks for net in ro.networkIter(self._inv, False): if req._disconnected: #pylint: disable=W0212 return False if skipRestricted and utils.isRestricted(net): continue # iterate over inventory stations for sta in ro.stationIter(net, True): if req._disconnected: #pylint: disable=W0212 return False if not self._matchStation(net, sta, ro, dac) or ( skipRestricted and utils.isRestricted(sta)): continue if isConditionalRequest: self.returnNotModified(req, ro) return True try: lat = str(sta.latitude()) except ValueError: lat = '' try: lon = str(sta.longitude()) except ValueError: lon = '' try: elev = str(sta.elevation()) except ValueError: elev = '' try: desc = sta.description() except ValueError: desc = '' start, end = self._formatEpoch(sta) lines.append(("%s.%s %s" % (net.code(), sta.code(), start), "%s|%s|%s|%s|%s|%s|%s|%s\n" % ( net.code(), sta.code(), lat, lon, elev, desc, start, end))) # level = channel (resonse level not supported in text format) else: data = "#Network|Station|Location|Channel|Latitude|Longitude|" \ "Elevation|Depth|Azimuth|Dip|SensorDescription|Scale|" \ "ScaleFreq|ScaleUnits|SampleRate|StartTime|EndTime\n" # iterate over inventory networks for net in ro.networkIter(self._inv, False): if req._disconnected: #pylint: disable=W0212 return False if skipRestricted and utils.isRestricted(net): continue # iterate over inventory stations, locations, streams for sta in ro.stationIter(net, False): if req._disconnected: #pylint: disable=W0212 return False if skipRestricted and utils.isRestricted(sta): continue for loc in ro.locationIter(net, sta, True): for stream in ro.streamIter(net, sta, loc, True, dac): if skipRestricted and utils.isRestricted(stream): continue if isConditionalRequest: self.returnNotModified(req, ro) return True try: lat = str(loc.latitude()) except ValueError: lat = '' try: lon = str(loc.longitude()) except ValueError: lon = '' try: elev = str(loc.elevation()) except ValueError: elev = '' try: depth = str(stream.depth()) except ValueError: depth = '' try: azi = str(stream.azimuth()) except ValueError: azi = '' try: dip = str(stream.dip()) except ValueError: dip = '' desc = '' try: sensor = self._inv.findSensor(stream.sensor()) if sensor is not None: desc = sensor.description() except ValueError: pass try: scale = str(stream.gain()) except ValueError: scale = '' try: scaleFreq = str(stream.gainFrequency()) except ValueError: scaleFreq = '' try: scaleUnit = str(stream.gainUnit()) except ValueError: scaleUnit = '' try: sr = str(stream.sampleRateNumerator() / stream.sampleRateDenominator()) except (ValueError, ZeroDivisionError): sr = '' start, end = self._formatEpoch(stream) lines.append(( "%s.%s.%s.%s %s" % ( net.code(), sta.code(), loc.code(), stream.code(), start), "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|" "%s|%s|%s|%s|%s|%s\n" % ( net.code(), sta.code(), loc.code(), stream.code(), lat, lon, elev, depth, azi, dip, desc, scale, scaleFreq, scaleUnit, sr, start, end))) # sort lines and append to final data string lines.sort(key=lambda line: line[0]) for line in lines: data += line[1] # Return 204 if no matching inventory was found if len(lines) == 0: msg = "no matching inventory found" self.writeErrorPage(req, http.NO_CONTENT, msg, ro) return False if self._conditionalRequestsEnabled: req.setHeader("Last-Modified", http.datetimeToString(self._timeInventoryLoaded)) dataBin = utils.py3bstr(data) utils.writeTSBin(req, dataBin) seiscomp.logging.debug("%s: returned %i lines (total bytes: %i)" % ( ro.service, len(lines), len(dataBin))) utils.accessLog(req, ro, http.OK, len(dataBin), None) return True #--------------------------------------------------------------------------- def _isConditionalRequest(self, req): # support for time based conditional requests if not self._conditionalRequestsEnabled: return False if req.method not in (b'GET', b'HEAD'): return False if req.getHeader("If-None-Match") is not None: return False modifiedSince = req.getHeader("If-Modified-Since") if not modifiedSince: return False modifiedSince = http.stringToDatetime(modifiedSince) return modifiedSince and self._timeInventoryLoaded <= modifiedSince #--------------------------------------------------------------------------- # Checks if at least one location and channel combination matches the # request options @staticmethod def _matchStation(net, sta, ro, dac): # No filter: return true immediately if dac is None and \ (not ro.channel or (not ro.channel.loc and not ro.channel.cha)): return True for loc in ro.locationIter(net, sta, False): if dac is None and not ro.channel.cha and not ro.time: return True for _ in ro.streamIter(net, sta, loc, False, dac): return True return False #--------------------------------------------------------------------------- # Adds a deep copy of the specified station to the new network if the # location and channel combination matches the request options (if any) @staticmethod def _processStation(newNet, net, sta, ro, dac, skipRestricted, isConditionalRequest): chaCount = 0 dataloggers, sensors, extents = set(), set(), {} newSta = seiscomp.datamodel.Station(sta) includeAvailability = dac is not None and ro.availability # Copy comments for i in range(sta.commentCount()): newSta.add(seiscomp.datamodel.Comment(sta.comment(i))) for loc in ro.locationIter(net, sta, True): newLoc = seiscomp.datamodel.SensorLocation(loc) # Copy comments for i in range(loc.commentCount()): newLoc.add(seiscomp.datamodel.Comment(loc.comment(i))) for stream in ro.streamIter(net, sta, loc, True, dac): if skipRestricted and utils.isRestricted(stream): continue if isConditionalRequest: return 1, 1, [], [], [] newCha = seiscomp.datamodel.Stream(stream) # Copy comments for i in range(stream.commentCount()): newCha.add(seiscomp.datamodel.Comment(stream.comment(i))) newLoc.add(newCha) dataloggers.add(stream.datalogger()) sensors.add(stream.sensor()) if includeAvailability: ext = dac.extent(net.code(), sta.code(), loc.code(), stream.code()) if ext is not None and ext.publicID() not in extents: extents[ext.publicID()] = ext if newLoc.streamCount() > 0: newSta.add(newLoc) chaCount += newLoc.streamCount() if newSta.sensorLocationCount() > 0: newNet.add(newSta) return chaCount, newSta.sensorLocationCount(), dataloggers, \ sensors, extents return 0, 0, [], [], [] #--------------------------------------------------------------------------- # Copy references (data loggers, sensors, responses) depended on request # options def _copyReferences(self, newInv, req, objCount, inv, ro, dataloggers, sensors, maxObj): responses = set() decCount = 0 # datalogger for i in range(inv.dataloggerCount()): if req._disconnected: #pylint: disable=W0212 return None logger = inv.datalogger(i) if logger.publicID() not in dataloggers: continue newLogger = seiscomp.datamodel.Datalogger(logger) newInv.add(newLogger) # decimations are only needed for responses if ro.includeRes: for j in range(logger.decimationCount()): decimation = logger.decimation(j) newLogger.add(seiscomp.datamodel.Decimation(decimation)) # collect response ids filterStr = "" try: filterStr = decimation.analogueFilterChain().content() + " " except ValueError: pass try: filterStr += decimation.digitalFilterChain().content() except ValueError: pass for resp in filterStr.split(): responses.add(resp) decCount += newLogger.decimationCount() objCount += newInv.dataloggerCount() + decCount resCount = len(responses) if not self.checkObjects(req, objCount + resCount, maxObj): return None # sensor for i in range(inv.sensorCount()): if req._disconnected: #pylint: disable=W0212 return None sensor = inv.sensor(i) if sensor.publicID() not in sensors: continue newSensor = seiscomp.datamodel.Sensor(sensor) newInv.add(newSensor) resp = newSensor.response() if resp: if ro.includeRes: responses.add(resp) else: # no responses: remove response reference to avoid missing # response warning of exporter newSensor.setResponse("") objCount += newInv.sensorCount() resCount = len(responses) if not self.checkObjects(req, objCount + resCount, maxObj): return None # responses if ro.includeRes: if req._disconnected: #pylint: disable=W0212 return None for i in range(inv.responsePAZCount()): resp = inv.responsePAZ(i) if resp.publicID() in responses: newInv.add(seiscomp.datamodel.ResponsePAZ(resp)) if req._disconnected: #pylint: disable=W0212 return None for i in range(inv.responseFIRCount()): resp = inv.responseFIR(i) if resp.publicID() in responses: newInv.add(seiscomp.datamodel.ResponseFIR(resp)) if req._disconnected: #pylint: disable=W0212 return None for i in range(inv.responsePolynomialCount()): resp = inv.responsePolynomial(i) if resp.publicID() in responses: newInv.add(seiscomp.datamodel.ResponsePolynomial(resp)) if req._disconnected: #pylint: disable=W0212 return None for i in range(inv.responseFAPCount()): resp = inv.responseFAP(i) if resp.publicID() in responses: newInv.add(seiscomp.datamodel.ResponseFAP(resp)) if req._disconnected: #pylint: disable=W0212 return None for i in range(inv.responseIIRCount()): resp = inv.responseIIR(i) if resp.publicID() in responses: newInv.add(seiscomp.datamodel.ResponseIIR(resp)) return decCount # vim: ts=4 et