Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/pulp_rpm/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,7 @@

# The path to the repo_auth.conf file
REPO_AUTH_CONFIG_FILE = '/etc/pulp/repo_auth.conf'

# used in the scratchpad
REPOMD_REVISION_KEY = 'repomd_revision'
PREVIOUS_SKIP_LIST = 'previous_skip_list'
3 changes: 3 additions & 0 deletions docs/user-guide/release-notes/2.7.x.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ The repo authentication functionality associated with pulp_rpm has been moved
into platform. This allows other plugins to take advantage of this
functionality.

The sync process checks the revision number in upstream metadata to determine if it has changed
since the previous sync. If not, most steps in the sync will be skipped. On large repositories,
this has reduced best-case sync times from an order of minutes to a few seconds.
3 changes: 0 additions & 3 deletions plugins/pulp_rpm/plugins/importers/yum/parse/treeinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def sync(sync_conduit, feed, working_dir, nectar_config, report, progress_callba
treefile_path = get_treefile(feed, tmp_dir, nectar_config)
if not treefile_path:
_LOGGER.debug('no treefile found')
report['state'] = constants.STATE_COMPLETE
return

try:
Expand All @@ -70,7 +69,6 @@ def sync(sync_conduit, feed, working_dir, nectar_config, report, progress_callba
# skip this whole process if the upstream treeinfo file hasn't changed
if len(existing_units) == 1 and existing_distribution_is_current(existing_units[0], model):
_LOGGER.debug('upstream distribution unchanged; skipping')
report['state'] = constants.STATE_COMPLETE
return

# Get any errors
Expand Down Expand Up @@ -104,7 +102,6 @@ def sync(sync_conduit, feed, working_dir, nectar_config, report, progress_callba
report['error_details'] = [(fail.url, fail.error_report) for fail in
listener.failed_reports]
return
report['state'] = constants.STATE_COMPLETE
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)

Expand Down
15 changes: 4 additions & 11 deletions plugins/pulp_rpm/plugins/importers/yum/repomd/metadata.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-

import contextlib
from copy import deepcopy
import gdbm
import bz2
Expand Down Expand Up @@ -137,8 +138,6 @@ def download_repomd(self):
error_report = self.event_listener.failed_reports[0]
raise IOError(error_report.error_msg)

# TODO (jconnonr 2013-03-07) add a method to validate/verify the repomd.xml file

def parse_repomd(self):
"""
Parse the downloaded repomd.xml file and populate the metadata dictionary.
Expand All @@ -165,7 +164,7 @@ def parse_repomd(self):
root_element.clear()

if element.tag == REVISION_TAG:
self.revision = element.text
self.revision = int(element.text)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirmed that no other code was using this field.


if element.tag == DATA_TAG:
file_info = process_repomd_data_element(element)
Expand Down Expand Up @@ -277,24 +276,18 @@ def generate_dbs(self):
filelists.PACKAGE_TAG, filelists.process_package_element),
(other.METADATA_FILE_NAME, other.PACKAGE_TAG, other.process_package_element),
):
xml_file_handle = self.get_metadata_file_handle(filename)
try:
with contextlib.closing(self.get_metadata_file_handle(filename)) as xml_file_handle:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't directly related, but while I was using contextlib, I couldn't help myself.

generator = package_list_generator(xml_file_handle, tag)
db_filename = os.path.join(self.dst_dir, '%s.db' % filename)
# always a New file, and open with Fast writing mode.
db_file_handle = gdbm.open(db_filename, 'nf')
try:
with contextlib.closing(gdbm.open(db_filename, 'nf')) as db_file_handle:
for element in generator:
utils.strip_ns(element)
raw_xml = utils.element_to_raw_xml(element)
unit_key, _ = process_func(element)
db_key = self.generate_db_key(unit_key)
db_file_handle[db_key] = raw_xml
db_file_handle.sync()
finally:
db_file_handle.close()
finally:
xml_file_handle.close()
self.dbs[filename] = db_filename

@staticmethod
Expand Down
160 changes: 106 additions & 54 deletions plugins/pulp_rpm/plugins/importers/yum/sync.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import functools
import logging
import os
Expand Down Expand Up @@ -60,6 +61,8 @@ def __init__(self, repo, sync_conduit, call_config):

flat_call_config = call_config.flatten()
self.nectar_config = nectar_utils.importer_config_to_nectar_config(flat_call_config)
self.skip_repomd_steps = False
self.current_revision = 0

def set_progress(self):
"""
Expand All @@ -82,6 +85,41 @@ def sync_feed(self):
repo_url += '/'
return repo_url

@contextlib.contextmanager
def update_state(self, state_dict, unit_type=None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleanup of the way the run() method manages the state of steps enabled me to add the additional skipping logic without that method getting unreasonably complex.

"""
Manage the state of a step in the sync process. This sets the state to
running and complete when appropriate, and optionally decides if the
step should be skipped (if a unit_type is passed in). This reports
progress before and after the step executes.

This context manager yields a boolean value; if True, the step should
be skipped.

:param state_dict: any dictionary containing a key "state" whose value
is the state of the current step.
:type state_dict: dict
:param unit_type: optional unit type. If provided, and if the value
appears in the list of configured types to skip,
the state will be set to SKIPPED and the yielded
boolean will be True.
:type unit_type: str
"""
skip_config = self.call_config.get(constants.CONFIG_SKIP, [])
skip = unit_type is not None and unit_type in skip_config

if skip:
state_dict[constants.PROGRESS_STATE_KEY] = constants.STATE_SKIPPED
else:
state_dict[constants.PROGRESS_STATE_KEY] = constants.STATE_RUNNING
self.set_progress()

yield skip

if state_dict[constants.PROGRESS_STATE_KEY] == constants.STATE_RUNNING:
state_dict[constants.PROGRESS_STATE_KEY] = constants.STATE_COMPLETE
self.set_progress()

def run(self):
"""
Steps through the entire workflow of a repo sync.
Expand All @@ -93,56 +131,40 @@ def run(self):
# we delete below
self.tmp_dir = tempfile.mkdtemp(dir=self.working_dir)
try:
self.progress_status['metadata']['state'] = constants.STATE_RUNNING
self.set_progress()

# Verify that we have a feed url. if there is no feed url then we have nothing to sync
if self.sync_feed is None:
raise PulpCodedException(error_code=error_codes.RPM1005)
with self.update_state(self.progress_status['metadata']):
# Verify that we have a feed url.
# if there is no feed url, then we have nothing to sync
if self.sync_feed is None:
raise PulpCodedException(error_code=error_codes.RPM1005)

metadata_files = self.get_metadata()
# Save the default checksum from the metadata
self.save_default_metadata_checksum_on_repo(metadata_files)
metadata_files = self.get_metadata()
# Save the default checksum from the metadata
self.save_default_metadata_checksum_on_repo(metadata_files)

if self.progress_status['metadata']['state'] == constants.STATE_RUNNING:
self.progress_status['metadata']['state'] = constants.STATE_COMPLETE
self.set_progress()

self.content_report['state'] = constants.STATE_RUNNING
self.set_progress()
self.update_content(metadata_files)
if self.content_report['state'] == constants.STATE_RUNNING:
self.content_report['state'] = constants.STATE_COMPLETE
self.set_progress()
with self.update_state(self.content_report) as skip:
if not (skip or self.skip_repomd_steps):
self.update_content(metadata_files)

_logger.info(_('Downloading additional units.'))
if models.Distribution.TYPE in self.call_config.get(constants.CONFIG_SKIP, []):
self.distribution_report['state'] = constants.STATE_SKIPPED
else:
self.distribution_report['state'] = constants.STATE_RUNNING
self.set_progress()
treeinfo.sync(self.sync_conduit, self.sync_feed, self.tmp_dir,
self.nectar_config, self.distribution_report, self.set_progress)
self.set_progress()

if models.Errata.TYPE in self.call_config.get(constants.CONFIG_SKIP, []):
self.progress_status['errata']['state'] = constants.STATE_SKIPPED
else:
self.progress_status['errata']['state'] = constants.STATE_RUNNING
self.set_progress()
self.get_errata(metadata_files)
self.progress_status['errata']['state'] = constants.STATE_COMPLETE
self.set_progress()

self.progress_status['comps']['state'] = constants.STATE_RUNNING
self.set_progress()
self.get_comps_file_units(metadata_files, group.process_group_element, group.GROUP_TAG)
self.get_comps_file_units(metadata_files, group.process_category_element,
group.CATEGORY_TAG)
self.get_comps_file_units(metadata_files, group.process_environment_element,
group.ENVIRONMENT_TAG)
self.progress_status['comps']['state'] = constants.STATE_COMPLETE
self.set_progress()
with self.update_state(self.distribution_report, models.Distribution.TYPE) as skip:
if not skip:
treeinfo.sync(self.sync_conduit, self.sync_feed, self.tmp_dir,
self.nectar_config, self.distribution_report,
self.set_progress)

with self.update_state(self.progress_status['errata'], models.Errata.TYPE) as skip:
if not (skip or self.skip_repomd_steps):
self.get_errata(metadata_files)

with self.update_state(self.progress_status['comps']) as skip:
if not (skip or self.skip_repomd_steps):
self.get_comps_file_units(metadata_files, group.process_group_element,
group.GROUP_TAG)
self.get_comps_file_units(metadata_files, group.process_category_element,
group.CATEGORY_TAG)
self.get_comps_file_units(metadata_files, group.process_environment_element,
group.ENVIRONMENT_TAG)

except CancelException:
report = self.sync_conduit.build_cancel_report(self._progress_summary,
Expand All @@ -166,6 +188,7 @@ def run(self):
# clean up whatever we may have left behind
shutil.rmtree(self.tmp_dir, ignore_errors=True)

self.save_repomd_revision()
_logger.info(_('Sync complete.'))
return self.sync_conduit.build_success_report(self._progress_summary, self.progress_status)

Expand Down Expand Up @@ -207,16 +230,45 @@ def get_metadata(self):
_logger.debug(traceback.format_exc())
raise PulpCodedException(error_code=error_codes.RPM1006)

_logger.info(_('Downloading metadata files.'))
scratchpad = self.sync_conduit.get_scratchpad() or {}
previous_revision = scratchpad.get(constants.REPOMD_REVISION_KEY, 0)
previous_skip_set = set(scratchpad.get(constants.PREVIOUS_SKIP_LIST, []))
current_skip_set = set(self.call_config.get(constants.CONFIG_SKIP, []))
self.current_revision = metadata_files.revision
# if the revision hasn't increased and the skip list doesn't include new types that weren't
# present on the last run...
if metadata_files.revision <= previous_revision \
and previous_skip_set - current_skip_set == set():
_logger.info(_('upstream repo metadata has not changed. Skipping steps.'))
self.skip_repomd_steps = True
return metadata_files
else:
_logger.info(_('Downloading metadata files.'))

metadata_files.download_metadata_files()
self.downloader = None
_logger.info(_('Generating metadata databases.'))
metadata_files.generate_dbs()
self.import_unknown_metadata_files(metadata_files)
# TODO: verify metadata
# metadata_files.verify_metadata_files()
return metadata_files
metadata_files.download_metadata_files()
self.downloader = None
_logger.info(_('Generating metadata databases.'))
metadata_files.generate_dbs()
self.import_unknown_metadata_files(metadata_files)
return metadata_files

def save_repomd_revision(self):
"""
If there were no errors during the sync, save the repomd revision
number to the scratchpad along with the configured skip list used
by this run.
"""
non_success_states = (constants.STATE_FAILED, constants.STATE_CANCELLED)
if len(self.content_report['error_details']) == 0\
and self.content_report[constants.PROGRESS_STATE_KEY] not in non_success_states:
_logger.debug(_('saving repomd.xml revision number and skip list to scratchpad'))
scratchpad = self.sync_conduit.get_scratchpad() or {}
scratchpad[constants.REPOMD_REVISION_KEY] = self.current_revision
# we save the skip list so if one of the types contained in it gets removed, the next
# sync will know to not skip based on repomd revision
scratchpad[constants.PREVIOUS_SKIP_LIST] = self.call_config.get(
constants.CONFIG_SKIP, [])
self.sync_conduit.set_scratchpad(scratchpad)

def save_default_metadata_checksum_on_repo(self, metadata_files):
"""
Expand Down
Loading