Project

General

Profile

« Previous | Next » 

Revision 8fc79619

Added by Jeff Ortel almost 6 years ago

fixes #24006 - support plugin reload when queue not found.

View differences:

test/test_katello/legacy_plugin_test.py
from mock import Mock, patch
from rhsm.connection import RemoteServerException
from rhsm.connection import RemoteServerException, GoneException
sys.path.append(os.path.join(os.path.dirname(__file__), '../../src/'))
......
self.assertEqual(plugin_cfg.messaging.uuid, 'pulp.agent.%s' % consumer_id)
class TestInitializer(PluginTest):
class TestPluginLoaded(PluginTest):
@patch('katello.agent.goferd.legacy_plugin.path_monitor')
@patch('katello.agent.goferd.legacy_plugin.PathMonitor')
@patch('katello.agent.goferd.legacy_plugin.ConsumerIdentity.certpath')
@patch('katello.agent.goferd.legacy_plugin.validate_registration')
def test_init(self, fake_validate, fake_path, fake_pmon):
self.plugin.registered = False
# test
self.plugin.init_plugin()
self.plugin.plugin_loaded()
# validation
fake_path.assert_called_with()
fake_pmon.add.assert_any_call(fake_path(), self.plugin.certificate_changed)
fake_pmon.start.assert_called_with()
fake_pmon.return_value.add.assert_any_call(fake_path(), self.plugin.certificate_changed)
fake_pmon.return_value.start.assert_called_with()
fake_validate.assert_called_with()
@patch('katello.agent.goferd.legacy_plugin.PathMonitor')
@patch('katello.agent.goferd.legacy_plugin.update_settings')
@patch('katello.agent.goferd.legacy_plugin.validate_registration')
def test_registered(self, validate, update_settings):
def test_registered(self, validate, update_settings, path_monitor):
# test
self.plugin.init_plugin()
self.plugin.plugin_loaded()
# validation
validate.assert_called_with()
update_settings.assert_called_with()
self.assertEqual(self.plugin.path_monitor, path_monitor.return_value)
self.assertFalse(self.plugin.plugin.attach.called)
@patch('katello.agent.goferd.legacy_plugin.PathMonitor')
@patch('katello.agent.goferd.legacy_plugin.update_settings')
@patch('katello.agent.goferd.legacy_plugin.validate_registration')
def test_run_not_registered(self, validate, update_settings):
def test_run_not_registered(self, validate, update_settings, path_monitor):
self.plugin.registered = False
# test
self.plugin.init_plugin()
self.plugin.plugin_loaded()
# validation
validate.assert_called_with()
self.assertFalse(update_settings.called)
self.assertEqual(self.plugin.path_monitor, path_monitor.return_value)
self.assertFalse(self.plugin.plugin.attach.called)
@patch('katello.agent.goferd.legacy_plugin.sleep')
@patch('katello.agent.goferd.legacy_plugin.PathMonitor')
@patch('katello.agent.goferd.legacy_plugin.update_settings')
@patch('katello.agent.goferd.legacy_plugin.validate_registration')
def test_run_validate_failed(self, validate, update_settings, sleep):
def test_run_validate_failed(self, validate, update_settings, path_monitor, sleep):
validate.side_effect = [ValueError, None]
# test
self.plugin.init_plugin()
self.plugin.plugin_loaded()
# validation
validate.assert_called_with()
sleep.assert_called_once_with(60)
update_settings.assert_called_with()
self.assertEqual(self.plugin.path_monitor, path_monitor.return_value)
self.assertFalse(self.plugin.plugin.attach.called)
class TestPluginUnloaded(PluginTest):
@patch('katello.agent.goferd.legacy_plugin.path_monitor')
def test_unloaded(self, path_monitor):
self.plugin.plugin_unloaded()
path_monitor.abort.assert_called_once_with()
class TestConduit(PluginTest):
@patch('katello.agent.goferd.legacy_plugin.ConsumerIdentity.read')
......
@patch('katello.agent.goferd.legacy_plugin.UEPConnection.getConsumer')
@patch('katello.agent.goferd.legacy_plugin.ConsumerIdentity.read')
@patch('katello.agent.goferd.legacy_plugin.ConsumerIdentity.existsAndValid')
def test_validate_registration_not_confirmed(self, valid, read, get_consumer):
def test_validate_registration_not_found(self, valid, read, get_consumer):
consumer_id = '1234'
valid.return_value = True
read.return_value.getConsumerId.return_value = consumer_id
......
get_consumer.assert_called_with(consumer_id)
self.assertFalse(self.plugin.registered)
@patch('katello.agent.goferd.legacy_plugin.UEPConnection.getConsumer')
@patch('katello.agent.goferd.legacy_plugin.ConsumerIdentity.read')
@patch('katello.agent.goferd.legacy_plugin.ConsumerIdentity.existsAndValid')
def test_validate_registration_gone(self, valid, read, get_consumer):
consumer_id = '1234'
valid.return_value = True
read.return_value.getConsumerId.return_value = consumer_id
get_consumer.side_effect = GoneException(httplib.GONE, '', '')
# test
self.plugin.validate_registration()
# validation
get_consumer.assert_called_with(consumer_id)
self.assertFalse(self.plugin.registered)
@patch('katello.agent.goferd.legacy_plugin.UEPConnection.getConsumer')
@patch('katello.agent.goferd.legacy_plugin.ConsumerIdentity.read')
@patch('katello.agent.goferd.legacy_plugin.ConsumerIdentity.existsAndValid')

Also available in: Unified diff