Source code for nifi_deploy.nifi

"""
Provides a NifiInstance class facilitating easy to use
methods utilizing the NiPyApi (https://github.com/Chaffelson/nipyapi)
wrapper library.
"""

import os
import sys
import getpass

from nipyapi import nifi, config, templates, canvas

# Disable urllib3 certificate warnings
from requests.packages.urllib3 import disable_warnings
disable_warnings()


[docs]class NifiInstance: """ The NifiInstance class facilitating easy to use methods utilizing the NiPyApi (https://github.com/Chaffelson/nipyapi) wrapper library. Arguments: url (str): Nifi host url, defaults to environment variable `NIFI_HOST`. username (str): Nifi username, defaults to environment variable `NIFI_USERNAME`. password (str): Nifi password, defaults to environment variable `NIFI_PASSWORD`. verify_ssl (bool): Whether to verify SSL connection - UNUSED as of now. """ def __init__(self, url=None, username=None, password=None, verify_ssl=False): config.nifi_config.host = self._get_url(url) config.nifi_config.verify_ssl = verify_ssl config.nifi_config.username = username self._authenticate(username, password) def _get_url(self, url): if not url: try: url = os.environ['NIFI_HOST'] except KeyError: url = input('Nifi host: ') if not '/nifi-api' in url: if not url[-1] == '/': url = url + '/' url = url + 'nifi-api' return url def _authenticate(self, username=None, password=None): if not username: try: config.nifi_config.username = os.environ['NIFI_USERNAME'] except KeyError: config.nifi_config.username = input('Username: ') if not password: try: password = os.environ['NIFI_PASSWORD'] except KeyError: password = getpass.getpass('Password: ') access_token = None try: access_token = nifi.AccessApi().create_access_token(username=config.nifi_config.username,password=password) except nifi.rest.ApiException as e: print('Exception when calling AccessApi->create_access_token: %s\n'.format(e)) config.nifi_config.api_key[username] = access_token config.nifi_config.api_client = nifi.ApiClient(header_name='Authorization', header_value='Bearer {}'.format(access_token))
[docs] def create_template(self, pg_id, name, desc=''): """ Create a template from process group id. Arguments: pg_id (str): Process group ID to create the template from. name (str): Name of the template to create. desc (str): Optional, description of the template to create. Returns: nipyapi.nifi.TemplateEntity """ try: obj = templates.create_template( pg_id=pg_id, name=name, desc=desc ) except nifi.rest.ApiException as e: print(e.body) return None return obj.template
[docs] def delete_template(self, template_id): """ Delete a template from Nifi template registry. Arguments: template_id (str): ID of the template to delete. Returns: None """ templates.delete_template(template_id)
[docs] def export_template(self, template_id, file_path=None): """ Export a template as XML, and optionally write it to a file or stdout. Arguments: template_id (str): ID of the template to export. file_path (str): Optional, path of file to write the XML to. Returns: String (template xml) """ template = templates.get_template(template_id, 'id').template output = 'file' if file_path else 'string' content = templates.export_template(template.id, output=output, file_path=file_path) return content
[docs] def import_template(self, file_path): """ Imports a template XML into Nifi's template store. Arguments: file_path (str): Path of the XML file to import into Nifi as a template. Returns: None """ canvas_id = canvas.get_root_pg_id() templates.upload_template(canvas_id, file_path)