openwebrxplus/owrx/storage.py

121 lines
4.3 KiB
Python

from owrx.config.core import CoreConfig
from owrx.config import Config
from datetime import datetime
import threading
import subprocess
import os
import re
import logging
logger = logging.getLogger(__name__)
class Storage(object):
sharedInstance = None
creationLock = threading.Lock()
filePattern = r'[A-Z0-9]+-[0-9]+-[0-9]+(-[0-9]+)?(-[0-9]+)?\.(bmp|png|txt|mp3)'
# Get shared instance of Storage class
@staticmethod
def getSharedInstance():
with Storage.creationLock:
if Storage.sharedInstance is None:
Storage.sharedInstance = Storage()
return Storage.sharedInstance
# Construct an instance of Storage class
def __init__(self):
self.lock = threading.Lock()
# Create stored file by name, modifying the name if the file exists
def newFile(self, fileName: str, buffering: int = -1):
filePath = self.getFilePath(fileName)
with self.lock:
if not os.path.exists(filePath):
return open(filePath, "wb", buffering = buffering)
elif "." in filePath:
filePathX = "-{0}.".join(filePath.rsplit(".", 1))
for i in range(99):
filePath1 = filePathX.format(i + 1)
if not os.path.exists(filePath1):
return open(filePath1, "wb", buffering = buffering)
raise FileExistsError("File '{0}' already exists.".format(filePath))
# Delete stored file by name, the name must match pattern
def deleteFile(self, fileName: str):
if re.match(self.filePattern, fileName):
filePath = self.getFilePath(fileName)
logger.info("Deleting '{0}'.".format(filePath))
with self.lock:
try:
os.unlink(filePath)
except Exception as e:
logger.debug("deleteFile(): " + str(e))
# Get list of stored files, sorted in reverse alphabetic order
# (so that newer files appear first)
def getStoredFiles(self):
dir = CoreConfig().get_temporary_directory()
with self.lock:
files = [os.path.join(dir, f) for f in os.listdir(dir) if re.match(self.filePattern, f)]
files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
return [os.path.basename(f) for f in files]
# Delete all stored files except for <keep_files> newest ones
def cleanStoredFiles(self):
pm = Config.get()
keep = pm["keep_files"]
dir = CoreConfig().get_temporary_directory()
with self.lock:
files = [os.path.join(dir, f) for f in os.listdir(dir) if re.match(self.filePattern, f)]
files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
for f in files[keep:]:
logger.debug("Deleting stored file '%s'." % os.path.basename(f))
try:
os.unlink(f)
except Exception as e:
logger.debug("cleanStoredFiles(): " + str(e))
# Get file name pattern
@staticmethod
def getNamePattern():
return Storage.filePattern
# Get complete path to a stored file from its filename by
# adding folder name
@staticmethod
def getFilePath(filename: str):
return os.path.join(CoreConfig().get_temporary_directory(), filename)
# Create stored file name by inserting current UTC date
# and time into the pattern spot designated with "{0}"
@staticmethod
def makeFileName(pattern: str = '{0}', frequency: int = 0):
d = datetime.utcnow().strftime('%y%m%d-%H%M%S')
f = ('-%d' % (frequency // 1000)) if frequency>0 else ''
return pattern.format(d + f)
# Convert given file from BMP to PNG format using ImageMagick
@staticmethod
def convertImage(inFile: str):
# Adds storage path
if not inFile.startswith('/'):
inFile = self.getFilePath(inFile)
# Only converting BMP files for now
outFile = re.sub(r'\.bmp$', '.png', inFile)
if outFile==inFile:
return
try:
# Use ImageMagick to convert file
params = ['convert', inFile, outFile]
subprocess.check_call(params)
# If conversion was successful, delete original file
os.unlink(inFile)
except Exception as e:
logger.debug("convertImage(): " + str(e))