Project

General

Profile

Download (9.98 KB) Statistics
| Branch: | Tag: | Revision:
#
# Copyright 2013 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public
# License as published by the Free Software Foundation; either version
# 2 of the License (GPLv2) or (at your option) any later version.
# There is NO WARRANTY for this software, express or implied,
# including the implied warranties of MERCHANTABILITY,
# NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should
# have received a copy of GPLv2 along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#

"""
The katello virtual agent.
Provides content management APIs for pulp within the RHSM environment.
"""
import os
import sys

sys.path.append('/usr/share/rhsm')

from time import sleep
from logging import getLogger
from subprocess import Popen

from six.moves import http_client as http

from katello.constants import REPOSITORY_PATH

from gofer.decorators import load, unload, remote, action, FORK
from gofer.agent.plugin import Plugin
from gofer.pmon import PathMonitor
from gofer.config import Config

try:
from subscription_manager.identity import ConsumerIdentity
except ImportError:
from subscription_manager.certlib import ConsumerIdentity

from rhsm.connection import UEPConnection, RemoteServerException, GoneException

from katello.agent.pulp import Dispatcher
from katello.enabled_report import EnabledReport
from katello.repos import upload_enabled_repos_report


# This plugin
plugin = Plugin.find(__name__)

# Path monitoring
path_monitor = None

# Track registration status
registered = False


log = getLogger(__name__)


RHSM_CONFIG_PATH = '/etc/rhsm/rhsm.conf'


@load
def plugin_loaded():
"""
Initialize the plugin.
Called (once) immediately after the plugin is loaded.
- setup path monitoring.
- validate registration. If registered:
- setup plugin configuration.
"""
global path_monitor
path = ConsumerIdentity.certpath()
path_monitor = PathMonitor()
path_monitor.add(path, certificate_changed)
path_monitor.add(REPOSITORY_PATH, send_enabled_report)
path_monitor.start()
while True:
try:
validate_registration()
if registered:
update_settings()
# DONE
break
except Exception as e:
log.warn(str(e))
sleep(60)


@unload
def plugin_unloaded():
"""
The plugin has been uploaded.
"""
path_monitor.abort()


def bundle(certificate):
"""
Bundle the key and cert and write to a file.
:param certificate: A consumer identity certificate.
:type certificate: ConsumerIdentity
:return: The path to written bundle.
:rtype: str
"""
path = os.path.join(certificate.PATH, 'bundle.pem')
fp = open(path, 'w')
try:
fp.write(certificate.key)
fp.write(certificate.cert)
return path
finally:
fp.close()


def certificate_changed(path):
"""
A certificate change has been detected.
On registration: setup the plugin; attach to the message broker.
On un-registration: detach from the message broker.
:param path: The path to the file that changed.
:type path: str
"""
log.info('changed: %s', path)
while True:
try:
validate_registration()
if registered:
update_settings()
send_enabled_report()
plugin.attach()
else:
plugin.detach()
# DONE
break
except Exception as e:
log.warn(str(e))
sleep(60)


def send_enabled_report(path=REPOSITORY_PATH):
report = EnabledReport(path)
upload_enabled_repos_report(report)


def update_settings():
"""
Setup the plugin based on the RHSM configuration.
"""
rhsm_conf = Config(RHSM_CONFIG_PATH)
certificate = ConsumerIdentity.read()
if 'ca_cert_dir' in rhsm_conf['rhsm']:
ca_cert_dir = rhsm_conf['rhsm']['ca_cert_dir']
else:
#handle old subscription-manager configurations
ca_cert_dir = rhsm_conf['server']['ca_cert_dir']

# the 'katello-default-ca.pem' is the ca used for generating the CA certs.
# the 'candlepin-local.pem' is there for compatibility reasons (the old path where the
# legacy installer was putting this file. If none of them is present, there is still
# a chance the rhsm_conf['rhsm']['repo_ca_cert'] is serving as the CA for issuing
# the client certs
ca_candidates = [os.path.join(ca_cert_dir, 'katello-default-ca.pem'),
os.path.join(ca_cert_dir, 'candlepin-local.pem'),
rhsm_conf['rhsm']['repo_ca_cert'] % {'ca_cert_dir': ca_cert_dir}]
existing_ca_certs = [cert for cert in ca_candidates if os.path.exists(cert)]
if not existing_ca_certs:
log.warn('None of the ca cert files %s found for the qpid connection' % ca_candidates)

raise
else:
log.info('Using %s as the ca cert for qpid connection' % existing_ca_certs[0])

plugin.cfg.messaging.cacert = existing_ca_certs[0]
plugin.cfg.messaging.url = 'proton+amqps://%s:5647' % rhsm_conf['server']['hostname']
plugin.cfg.messaging.uuid = 'pulp.agent.%s' % certificate.getConsumerId()
bundle(certificate)


def validate_registration():
"""
Validate consumer registration by making a REST call
to the server. Updates the global 'registered' variable.
"""
global registered
registered = False

if ConsumerIdentity.existsAndValid():
consumer = ConsumerIdentity.read()
consumer_id = consumer.getConsumerId()
else:
return

try:
uep = UEP()
consumer = uep.getConsumer(consumer_id)
registered = (consumer is not None)
except GoneException:
registered = False
except RemoteServerException as e:
if e.code not in (http.NOT_FOUND, http.GONE):
log.warn(str(e))
raise
except Exception as e:
log.exception(str(e))
raise


class AgentRestart(object):
"""
Restart the daemon after RPM upgrade.
The %post in the RPM will write the RESTART_FILE. The recurring
'apply' action notices file and restarts the goferd service only when
this plugin is not longer busy.
"""

COMMAND = 'service goferd restart'
RESTART_FILE = '/tmp/katello-agent-restart'

@staticmethod
def is_busy():
"""
Determine if this plugin is busy by counting the pending
requests in the persistent queue.

:return: True if busy.
:rtype: bool
"""
pending = plugin.scheduler.pending
path = os.path.join(pending.PENDING, pending.stream)
count = len(os.listdir(path))
return count > 0

@staticmethod
def restart():
"""
Restart the goferd service.
1. Delete the RESTART_FILE.
2. Restart

:return: The restart command exit value.
:rtype: int
"""
os.unlink(AgentRestart.RESTART_FILE)
p = Popen(AgentRestart.COMMAND, shell=True)
return p.wait()

@action(minutes=3)
def apply(self):
"""
Detect the RESTART_FILE and restart the goferd service
only when this plugin is not longer busy. This ensures that
an RPM update has completed.
"""
if not os.path.exists(AgentRestart.RESTART_FILE):
return
if self.is_busy():
return
log.info('Restarting goferd.')
exit_val = self.restart()
# only reached when restart failed.
log.error('Restart failed, exit=%d', exit_val)


class UEP(UEPConnection):
"""
Represents the UEP.
"""

def __init__(self):
key = ConsumerIdentity.keypath()
cert = ConsumerIdentity.certpath()
UEPConnection.__init__(self, key_file=key, cert_file=cert)


# --- API --------------------------------------------------------------------
class Consumer(object):
"""
When a consumer is unregistered, Katello notifies the goferd.
"""

@remote
def unregister(self):
log.info('Consumer has been unregistered. '
'Katello agent will no longer function until '
'this system is reregistered.')


class Content(object):
"""
Pulp Content Management.
"""

@remote(model=FORK)
def install(self, units, options):
"""
Install the specified content units using the specified options.
Delegated to content handlers.
:param units: A list of content units to be installed.
:type units: list of:
{ type_id:<str>, unit_key:<dict> }
:param options: Install options; based on unit type.
:type options: dict
:return: A dispatch report.
:rtype: DispatchReport
"""
dispatcher = Dispatcher()
report = dispatcher.install(units, options)
return report.dict()

@remote(model=FORK)
def update(self, units, options):
"""
Update the specified content units using the specified options.
Delegated to content handlers.
:param units: A list of content units to be updated.
:type units: list of:
{ type_id:<str>, unit_key:<dict> }
:param options: Update options; based on unit type.
:type options: dict
:return: A dispatch report.
:rtype: DispatchReport
"""
dispatcher = Dispatcher()
report = dispatcher.update(units, options)
return report.dict()

@remote(model=FORK)
def uninstall(self, units, options):
"""
Uninstall the specified content units using the specified options.
Delegated to content handlers.
:param units: A list of content units to be uninstalled.
:type units: list of:
{ type_id:<str>, unit_key:<dict> }
:param options: Uninstall options; based on unit type.
:type options: dict
:return: A dispatch report.
:rtype: DispatchReport
"""
dispatcher = Dispatcher()
report = dispatcher.uninstall( units, options)
return report.dict()
(3-3/3)