mirror of
https://github.com/Unidata/python-awips.git
synced 2025-02-24 06:57:56 -05:00
119 lines
3.9 KiB
Python
119 lines
3.9 KiB
Python
|
#
|
||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||
|
# or more contributor license agreements. See the NOTICE file
|
||
|
# distributed with this work for additional information
|
||
|
# regarding copyright ownership. The ASF licenses this file
|
||
|
# to you under the Apache License, Version 2.0 (the
|
||
|
# "License"); you may not use this file except in compliance
|
||
|
# with the License. You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing,
|
||
|
# software distributed under the License is distributed on an
|
||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||
|
# KIND, either express or implied. See the License for the
|
||
|
# specific language governing permissions and limitations
|
||
|
# under the License.
|
||
|
#
|
||
|
|
||
|
|
||
|
import logging
|
||
|
from multiprocessing import Process, Value, Condition, reduction
|
||
|
|
||
|
from TServer import TServer
|
||
|
from thrift.transport.TTransport import TTransportException
|
||
|
|
||
|
|
||
|
class TProcessPoolServer(TServer):
|
||
|
"""Server with a fixed size pool of worker subprocesses to service requests
|
||
|
|
||
|
Note that if you need shared state between the handlers - it's up to you!
|
||
|
Written by Dvir Volk, doat.com
|
||
|
"""
|
||
|
def __init__(self, *args):
|
||
|
TServer.__init__(self, *args)
|
||
|
self.numWorkers = 10
|
||
|
self.workers = []
|
||
|
self.isRunning = Value('b', False)
|
||
|
self.stopCondition = Condition()
|
||
|
self.postForkCallback = None
|
||
|
|
||
|
def setPostForkCallback(self, callback):
|
||
|
if not callable(callback):
|
||
|
raise TypeError("This is not a callback!")
|
||
|
self.postForkCallback = callback
|
||
|
|
||
|
def setNumWorkers(self, num):
|
||
|
"""Set the number of worker threads that should be created"""
|
||
|
self.numWorkers = num
|
||
|
|
||
|
def workerProcess(self):
|
||
|
"""Loop getting clients from the shared queue and process them"""
|
||
|
if self.postForkCallback:
|
||
|
self.postForkCallback()
|
||
|
|
||
|
while self.isRunning.value:
|
||
|
try:
|
||
|
client = self.serverTransport.accept()
|
||
|
self.serveClient(client)
|
||
|
except (KeyboardInterrupt, SystemExit):
|
||
|
return 0
|
||
|
except Exception as x:
|
||
|
logging.exception(x)
|
||
|
|
||
|
def serveClient(self, client):
|
||
|
"""Process input/output from a client for as long as possible"""
|
||
|
itrans = self.inputTransportFactory.getTransport(client)
|
||
|
otrans = self.outputTransportFactory.getTransport(client)
|
||
|
iprot = self.inputProtocolFactory.getProtocol(itrans)
|
||
|
oprot = self.outputProtocolFactory.getProtocol(otrans)
|
||
|
|
||
|
try:
|
||
|
while True:
|
||
|
self.processor.process(iprot, oprot)
|
||
|
except TTransportException, tx:
|
||
|
pass
|
||
|
except Exception as x:
|
||
|
logging.exception(x)
|
||
|
|
||
|
itrans.close()
|
||
|
otrans.close()
|
||
|
|
||
|
def serve(self):
|
||
|
"""Start workers and put into queue"""
|
||
|
# this is a shared state that can tell the workers to exit when False
|
||
|
self.isRunning.value = True
|
||
|
|
||
|
# first bind and listen to the port
|
||
|
self.serverTransport.listen()
|
||
|
|
||
|
# fork the children
|
||
|
for i in range(self.numWorkers):
|
||
|
try:
|
||
|
w = Process(target=self.workerProcess)
|
||
|
w.daemon = True
|
||
|
w.start()
|
||
|
self.workers.append(w)
|
||
|
except Exception, x:
|
||
|
logging.exception(x)
|
||
|
|
||
|
# wait until the condition is set by stop()
|
||
|
while True:
|
||
|
self.stopCondition.acquire()
|
||
|
try:
|
||
|
self.stopCondition.wait()
|
||
|
break
|
||
|
except (SystemExit, KeyboardInterrupt):
|
||
|
break
|
||
|
except Exception as x:
|
||
|
logging.exception(x)
|
||
|
|
||
|
self.isRunning.value = False
|
||
|
|
||
|
def stop(self):
|
||
|
self.isRunning.value = False
|
||
|
self.stopCondition.acquire()
|
||
|
self.stopCondition.notify()
|
||
|
self.stopCondition.release()
|