2018-09-05 15:52:38 -06:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
# or more contributor license agreements. See the NOTICE file
|
|
|
|
# distributed with this work for additional information
|
|
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
|
|
# to you under the Apache License, Version 2.0 (the
|
|
|
|
# "License"); you may not use this file except in compliance
|
|
|
|
# with the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing,
|
|
|
|
# software distributed under the License is distributed on an
|
|
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
|
# KIND, either express or implied. See the License for the
|
|
|
|
# specific language governing permissions and limitations
|
|
|
|
# under the License.
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
|
|
import logging
|
2023-08-23 13:46:12 -06:00
|
|
|
|
|
|
|
from multiprocessing import Process, Value, Condition
|
2018-09-05 15:52:38 -06:00
|
|
|
|
|
|
|
from .TServer import TServer
|
|
|
|
from thrift.transport.TTransport import TTransportException
|
2023-08-23 13:46:12 -06:00
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
2018-09-05 15:52:38 -06:00
|
|
|
|
|
|
|
|
|
|
|
class TProcessPoolServer(TServer):
|
|
|
|
"""Server with a fixed size pool of worker subprocesses to service requests
|
|
|
|
|
|
|
|
Note that if you need shared state between the handlers - it's up to you!
|
|
|
|
Written by Dvir Volk, doat.com
|
|
|
|
"""
|
|
|
|
def __init__(self, *args):
|
|
|
|
TServer.__init__(self, *args)
|
|
|
|
self.numWorkers = 10
|
|
|
|
self.workers = []
|
|
|
|
self.isRunning = Value('b', False)
|
|
|
|
self.stopCondition = Condition()
|
|
|
|
self.postForkCallback = None
|
|
|
|
|
2024-04-01 13:25:43 -06:00
|
|
|
def __getstate__(self):
|
|
|
|
state = self.__dict__.copy()
|
|
|
|
state['workers'] = None
|
|
|
|
return state
|
|
|
|
|
2018-09-05 15:52:38 -06:00
|
|
|
def setPostForkCallback(self, callback):
|
2023-08-23 13:46:12 -06:00
|
|
|
if not callable(callback):
|
2018-09-05 15:52:38 -06:00
|
|
|
raise TypeError("This is not a callback!")
|
|
|
|
self.postForkCallback = callback
|
|
|
|
|
|
|
|
def setNumWorkers(self, num):
|
|
|
|
"""Set the number of worker threads that should be created"""
|
|
|
|
self.numWorkers = num
|
|
|
|
|
|
|
|
def workerProcess(self):
|
|
|
|
"""Loop getting clients from the shared queue and process them"""
|
|
|
|
if self.postForkCallback:
|
|
|
|
self.postForkCallback()
|
|
|
|
|
|
|
|
while self.isRunning.value:
|
|
|
|
try:
|
|
|
|
client = self.serverTransport.accept()
|
2023-08-23 13:46:12 -06:00
|
|
|
if not client:
|
|
|
|
continue
|
2018-09-05 15:52:38 -06:00
|
|
|
self.serveClient(client)
|
|
|
|
except (KeyboardInterrupt, SystemExit):
|
|
|
|
return 0
|
|
|
|
except Exception as x:
|
2023-08-23 13:46:12 -06:00
|
|
|
logger.exception(x)
|
2018-09-05 15:52:38 -06:00
|
|
|
|
|
|
|
def serveClient(self, client):
|
|
|
|
"""Process input/output from a client for as long as possible"""
|
|
|
|
itrans = self.inputTransportFactory.getTransport(client)
|
|
|
|
otrans = self.outputTransportFactory.getTransport(client)
|
|
|
|
iprot = self.inputProtocolFactory.getProtocol(itrans)
|
|
|
|
oprot = self.outputProtocolFactory.getProtocol(otrans)
|
|
|
|
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
self.processor.process(iprot, oprot)
|
2023-08-23 13:46:12 -06:00
|
|
|
except TTransportException:
|
2018-09-05 15:52:38 -06:00
|
|
|
pass
|
|
|
|
except Exception as x:
|
2023-08-23 13:46:12 -06:00
|
|
|
logger.exception(x)
|
2018-09-05 15:52:38 -06:00
|
|
|
|
|
|
|
itrans.close()
|
|
|
|
otrans.close()
|
|
|
|
|
|
|
|
def serve(self):
|
|
|
|
"""Start workers and put into queue"""
|
|
|
|
# this is a shared state that can tell the workers to exit when False
|
|
|
|
self.isRunning.value = True
|
|
|
|
|
|
|
|
# first bind and listen to the port
|
|
|
|
self.serverTransport.listen()
|
|
|
|
|
|
|
|
# fork the children
|
|
|
|
for i in range(self.numWorkers):
|
|
|
|
try:
|
|
|
|
w = Process(target=self.workerProcess)
|
|
|
|
w.daemon = True
|
|
|
|
w.start()
|
|
|
|
self.workers.append(w)
|
|
|
|
except Exception as x:
|
2023-08-23 13:46:12 -06:00
|
|
|
logger.exception(x)
|
2018-09-05 15:52:38 -06:00
|
|
|
|
|
|
|
# wait until the condition is set by stop()
|
|
|
|
while True:
|
|
|
|
self.stopCondition.acquire()
|
|
|
|
try:
|
|
|
|
self.stopCondition.wait()
|
|
|
|
break
|
|
|
|
except (SystemExit, KeyboardInterrupt):
|
|
|
|
break
|
|
|
|
except Exception as x:
|
2023-08-23 13:46:12 -06:00
|
|
|
logger.exception(x)
|
2018-09-05 15:52:38 -06:00
|
|
|
|
|
|
|
self.isRunning.value = False
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
self.isRunning.value = False
|
|
|
|
self.stopCondition.acquire()
|
|
|
|
self.stopCondition.notify()
|
|
|
|
self.stopCondition.release()
|