Source code for bspump.application
import logging
import signal
import sys
import os
from bspump.asab import Application, Config
import bspump.asab.web
import bspump.asab.api
from .service import BSPumpService
from .__version__ import __version__, __build__
L = logging.getLogger(__name__)
[docs]
class BSPumpApplication(Application):
"""
Description: BSPumpApplication is **class** used for .....
:return:
"""
[docs]
def __init__(self, args=None):
super().__init__(args=args)
# Banner
print("BitSwan BSPump version {}".format(__version__))
from bspump.asab.proactor import Module
self.add_module(Module)
from bspump.asab.metrics import Module
self.add_module(Module)
self.ASABApiService = bspump.asab.api.ApiService(self)
self.PumpService = BSPumpService(self)
self.WebContainer = None
from bspump.asab.alert import AlertService
self.AlertService = AlertService(self)
self.MQTTService = None
try:
# Signals are not available on Windows
self.Loop.add_signal_handler(signal.SIGUSR1, self._on_signal_usr1)
except (NotImplementedError, AttributeError):
pass
# Register bspump API endpoints, if requested (the web service is present)
if "web" in Config and Config["web"].get("listen"):
# Initialize API service
self.add_module(bspump.asab.web.Module)
self.WebService = self.get_service("asab.WebService")
from .web import initialize_web
self.WebContainer = initialize_web(self.WebService.WebContainer)
self.ASABApiService.initialize_web()
mqtt_username = os.environ.get("MQTT_USERNAME")
mqtt_password = os.environ.get("MQTT_PASSWORD")
mqtt_broker_url = os.environ.get("MQTT_BROKER_URL")
if mqtt_broker_url and self.DeploymentId and (mqtt_username or mqtt_password):
from .mqtt import MQTTService, MQTTConnection
self.PumpService.add_connection(
MQTTConnection(
self,
"MQTTServiceConnection",
{
"username": mqtt_username,
"password": mqtt_password,
"broker": mqtt_broker_url,
},
)
)
self.MQTTService = MQTTService(self, connection="MQTTServiceConnection")
# Initialize zookeeper container
if "zookeeper" in Config.sections():
from bspump.asab.zookeeper import Module
self.add_module(Module)
self.ASABApiService.initialize_zookeeper()
[docs]
def create_argument_parser(self):
"""
Description:
:return:
"""
prog = sys.argv[0]
if prog[-11:] == "__main__.py":
prog = sys.executable + " -m bspump"
description = """
BSPump is a stream processor. It is a part of BitSwan.
For more information, visit: https://github.com/LibertyAces/BitSwanPump
version: {}
build: {} [{}]
""".format(__version__, __build__, __build__[:7])
parser = super().create_argument_parser(prog=prog, description=description)
parser.add_argument(
"--test", action="store_true", help="Run pipeline/automation tests"
)
parser.add_argument(
"notebook", nargs="?", default=None, help="Jupyter notebook"
)
# add watch argument that watches the notebooks for changes and restarts if needed
parser.add_argument(
"--watch",
action="store_true",
help="Watch the notebook for changes and restarts if needed",
)
return parser
[docs]
def parse_arguments(self, args=None):
args = super().parse_arguments(args=args)
self.Test = args.test
self.Notebook = args.notebook
if self.Notebook is None:
self.Notebook = os.environ.get("JUPYTER_NOTEBOOK", "pipelines/main.ipynb")
self.Watch = args.watch
[docs]
async def main(self):
print("{} pipeline(s) ready.".format(len(self.PumpService.Pipelines)))
if self.Watch:
from .watch import Watcher
Watcher([self.Notebook])
def _on_signal_usr1(self):
"""
Description:
:return:
:hint: To clear reset from all pipelines, run
$ kill -SIGUSR1 xxxx
Equivalently, you can use `docker kill -s SIGUSR1 ....` to reset containerized BSPump.
"""
# Reset errors from all pipelines
for pipeline in self.PumpService.Pipelines.values():
if not pipeline.is_error():
continue # Focus only on pipelines that has errors
pipeline.set_error(None, None, None)