mirror of
https://github.com/Unidata/python-awips.git
synced 2025-02-24 06:57:56 -05:00
249 lines
7.9 KiB
Python
249 lines
7.9 KiB
Python
|
#
|
||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||
|
# or more contributor license agreements. See the NOTICE file
|
||
|
# distributed with this work for additional information
|
||
|
# regarding copyright ownership. The ASF licenses this file
|
||
|
# to you under the Apache License, Version 2.0 (the
|
||
|
# "License"); you may not use this file except in compliance
|
||
|
# with the License. You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing,
|
||
|
# software distributed under the License is distributed on an
|
||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||
|
# KIND, either express or implied. See the License for the
|
||
|
# specific language governing permissions and limitations
|
||
|
# under the License.
|
||
|
#
|
||
|
|
||
|
"""TZlibTransport provides a compressed transport and transport factory
|
||
|
class, using the python standard library zlib module to implement
|
||
|
data compression.
|
||
|
"""
|
||
|
|
||
|
|
||
|
import zlib
|
||
|
from io import StringIO
|
||
|
from .TTransport import TTransportBase, CReadableTransport
|
||
|
|
||
|
|
||
|
class TZlibTransportFactory(object):
|
||
|
"""Factory transport that builds zlib compressed transports.
|
||
|
|
||
|
This factory caches the last single client/transport that it was passed
|
||
|
and returns the same TZlibTransport object that was created.
|
||
|
|
||
|
This caching means the TServer class will get the _same_ transport
|
||
|
object for both input and output transports from this factory.
|
||
|
(For non-threaded scenarios only, since the cache only holds one object)
|
||
|
|
||
|
The purpose of this caching is to allocate only one TZlibTransport where
|
||
|
only one is really needed (since it must have separate read/write buffers),
|
||
|
and makes the statistics from getCompSavings() and getCompRatio()
|
||
|
easier to understand.
|
||
|
"""
|
||
|
# class scoped cache of last transport given and zlibtransport returned
|
||
|
_last_trans = None
|
||
|
_last_z = None
|
||
|
|
||
|
def getTransport(self, trans, compresslevel=9):
|
||
|
"""Wrap a transport, trans, with the TZlibTransport
|
||
|
compressed transport class, returning a new
|
||
|
transport to the caller.
|
||
|
|
||
|
@param compresslevel: The zlib compression level, ranging
|
||
|
from 0 (no compression) to 9 (best compression). Defaults to 9.
|
||
|
@type compresslevel: int
|
||
|
|
||
|
This method returns a TZlibTransport which wraps the
|
||
|
passed C{trans} TTransport derived instance.
|
||
|
"""
|
||
|
if trans == self._last_trans:
|
||
|
return self._last_z
|
||
|
ztrans = TZlibTransport(trans, compresslevel)
|
||
|
self._last_trans = trans
|
||
|
self._last_z = ztrans
|
||
|
return ztrans
|
||
|
|
||
|
|
||
|
class TZlibTransport(TTransportBase, CReadableTransport):
|
||
|
"""Class that wraps a transport with zlib, compressing writes
|
||
|
and decompresses reads, using the python standard
|
||
|
library zlib module.
|
||
|
"""
|
||
|
# Read buffer size for the python fastbinary C extension,
|
||
|
# the TBinaryProtocolAccelerated class.
|
||
|
DEFAULT_BUFFSIZE = 4096
|
||
|
|
||
|
def __init__(self, trans, compresslevel=9):
|
||
|
"""Create a new TZlibTransport, wrapping C{trans}, another
|
||
|
TTransport derived object.
|
||
|
|
||
|
@param trans: A thrift transport object, i.e. a TSocket() object.
|
||
|
@type trans: TTransport
|
||
|
@param compresslevel: The zlib compression level, ranging
|
||
|
from 0 (no compression) to 9 (best compression). Default is 9.
|
||
|
@type compresslevel: int
|
||
|
"""
|
||
|
self.__trans = trans
|
||
|
self.compresslevel = compresslevel
|
||
|
self.__rbuf = StringIO()
|
||
|
self.__wbuf = StringIO()
|
||
|
self._init_zlib()
|
||
|
self._init_stats()
|
||
|
|
||
|
def _reinit_buffers(self):
|
||
|
"""Internal method to initialize/reset the internal StringIO objects
|
||
|
for read and write buffers.
|
||
|
"""
|
||
|
self.__rbuf = StringIO()
|
||
|
self.__wbuf = StringIO()
|
||
|
|
||
|
def _init_stats(self):
|
||
|
"""Internal method to reset the internal statistics counters
|
||
|
for compression ratios and bandwidth savings.
|
||
|
"""
|
||
|
self.bytes_in = 0
|
||
|
self.bytes_out = 0
|
||
|
self.bytes_in_comp = 0
|
||
|
self.bytes_out_comp = 0
|
||
|
|
||
|
def _init_zlib(self):
|
||
|
"""Internal method for setting up the zlib compression and
|
||
|
decompression objects.
|
||
|
"""
|
||
|
self._zcomp_read = zlib.decompressobj()
|
||
|
self._zcomp_write = zlib.compressobj(self.compresslevel)
|
||
|
|
||
|
def getCompRatio(self):
|
||
|
"""Get the current measured compression ratios (in,out) from
|
||
|
this transport.
|
||
|
|
||
|
Returns a tuple of:
|
||
|
(inbound_compression_ratio, outbound_compression_ratio)
|
||
|
|
||
|
The compression ratios are computed as:
|
||
|
compressed / uncompressed
|
||
|
|
||
|
E.g., data that compresses by 10x will have a ratio of: 0.10
|
||
|
and data that compresses to half of ts original size will
|
||
|
have a ratio of 0.5
|
||
|
|
||
|
None is returned if no bytes have yet been processed in
|
||
|
a particular direction.
|
||
|
"""
|
||
|
r_percent, w_percent = (None, None)
|
||
|
if self.bytes_in > 0:
|
||
|
r_percent = self.bytes_in_comp / self.bytes_in
|
||
|
if self.bytes_out > 0:
|
||
|
w_percent = self.bytes_out_comp / self.bytes_out
|
||
|
return (r_percent, w_percent)
|
||
|
|
||
|
def getCompSavings(self):
|
||
|
"""Get the current count of saved bytes due to data
|
||
|
compression.
|
||
|
|
||
|
Returns a tuple of:
|
||
|
(inbound_saved_bytes, outbound_saved_bytes)
|
||
|
|
||
|
Note: if compression is actually expanding your
|
||
|
data (only likely with very tiny thrift objects), then
|
||
|
the values returned will be negative.
|
||
|
"""
|
||
|
r_saved = self.bytes_in - self.bytes_in_comp
|
||
|
w_saved = self.bytes_out - self.bytes_out_comp
|
||
|
return (r_saved, w_saved)
|
||
|
|
||
|
def isOpen(self):
|
||
|
"""Return the underlying transport's open status"""
|
||
|
return self.__trans.isOpen()
|
||
|
|
||
|
def open(self):
|
||
|
"""Open the underlying transport"""
|
||
|
self._init_stats()
|
||
|
return self.__trans.open()
|
||
|
|
||
|
def listen(self):
|
||
|
"""Invoke the underlying transport's listen() method"""
|
||
|
self.__trans.listen()
|
||
|
|
||
|
def accept(self):
|
||
|
"""Accept connections on the underlying transport"""
|
||
|
return self.__trans.accept()
|
||
|
|
||
|
def close(self):
|
||
|
"""Close the underlying transport,"""
|
||
|
self._reinit_buffers()
|
||
|
self._init_zlib()
|
||
|
return self.__trans.close()
|
||
|
|
||
|
def read(self, sz):
|
||
|
"""Read up to sz bytes from the decompressed bytes buffer, and
|
||
|
read from the underlying transport if the decompression
|
||
|
buffer is empty.
|
||
|
"""
|
||
|
ret = self.__rbuf.read(sz)
|
||
|
if len(ret) > 0:
|
||
|
return ret
|
||
|
# keep reading from transport until something comes back
|
||
|
while True:
|
||
|
if self.readComp(sz):
|
||
|
break
|
||
|
ret = self.__rbuf.read(sz)
|
||
|
return ret
|
||
|
|
||
|
def readComp(self, sz):
|
||
|
"""Read compressed data from the underlying transport, then
|
||
|
decompress it and append it to the internal StringIO read buffer
|
||
|
"""
|
||
|
zbuf = self.__trans.read(sz)
|
||
|
zbuf = self._zcomp_read.unconsumed_tail + zbuf
|
||
|
buf = self._zcomp_read.decompress(zbuf)
|
||
|
self.bytes_in += len(zbuf)
|
||
|
self.bytes_in_comp += len(buf)
|
||
|
old = self.__rbuf.read()
|
||
|
self.__rbuf = StringIO(old + buf)
|
||
|
if len(old) + len(buf) == 0:
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def write(self, buf):
|
||
|
"""Write some bytes, putting them into the internal write
|
||
|
buffer for eventual compression.
|
||
|
"""
|
||
|
self.__wbuf.write(buf)
|
||
|
|
||
|
def flush(self):
|
||
|
"""Flush any queued up data in the write buffer and ensure the
|
||
|
compression buffer is flushed out to the underlying transport
|
||
|
"""
|
||
|
wout = self.__wbuf.getvalue()
|
||
|
if len(wout) > 0:
|
||
|
zbuf = self._zcomp_write.compress(wout)
|
||
|
self.bytes_out += len(wout)
|
||
|
self.bytes_out_comp += len(zbuf)
|
||
|
else:
|
||
|
zbuf = ''
|
||
|
ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH)
|
||
|
self.bytes_out_comp += len(ztail)
|
||
|
if (len(zbuf) + len(ztail)) > 0:
|
||
|
self.__wbuf = StringIO()
|
||
|
self.__trans.write(zbuf + ztail)
|
||
|
self.__trans.flush()
|
||
|
|
||
|
@property
|
||
|
def cstringio_buf(self):
|
||
|
"""Implement the CReadableTransport interface"""
|
||
|
return self.__rbuf
|
||
|
|
||
|
def cstringio_refill(self, partialread, reqlen):
|
||
|
"""Implement the CReadableTransport interface for refill"""
|
||
|
retstring = partialread
|
||
|
if reqlen < self.DEFAULT_BUFFSIZE:
|
||
|
retstring += self.read(self.DEFAULT_BUFFSIZE)
|
||
|
while len(retstring) < reqlen:
|
||
|
retstring += self.read(reqlen - len(retstring))
|
||
|
self.__rbuf = StringIO(retstring)
|
||
|
return self.__rbuf
|