Incremental Imports of Archival Content with Feeds

Joe Turgeon
#Drupal | Posted

Using the Feeds contrib module for Drupal is a popular route for importing content from RSS feeds or similar streams of data, including service APIs like Twitter or Flickr and even email accounts. The module provides robust methods to regularly check a feed for updates and to create or update nodes by mapping feed content to Drupal entities.

Since RSS feeds usually provide the most recent batch of content, the typical Feeds implementation will incrementally import feed items on an ongoing basis. However, some use cases call for also importing all historical content from the same data source. For example, a new website may need to feature both new and archival content from an external blog.

In such a case, if the historical content from the source system can be exposed using the same format or method as the feed for ongoing updates, then it may be beneficial to use the same process to accomplish both the archival and ongoing import.

A Migration By Any Other Name?

If the use case only involves the archival content import, then this would be a plain and simple migration, and I would likely recommend the Migrate contrib module. However, Migrate is less suited for ongoing, regularly scheduled imports from remote data sources. So in this case, I’d rather find a way for Feeds to work for the archival part of the job.

That said, we can take a few lessons from the Migrate module in terms of properties of a good migration system when implementing a system with Feeds:

  • It should be able to run incrementally. The script should not assume that all of the content can be handled in a single operation. There may be network interruptions or resource limitations that break the script on any given run.

  • It should not create duplicates if it is run multiple times. There should be no penalty to running the import multiple times either to confirm a successful import or to catch content added since the last import.

  • It should be able to be “rolled back” easily. Due to changes made to the importer configuration or the source content, it’s often necessary to remove all imported content and run the import process again.

Feeds provides good support for these requirements. It can run repeatedly on the same feed, and will only create content for new feed items -- either ignoring or updating (based on configuration) content imported on previous runs. It also provides the ability to delete all items from the feed, which satisfies the rollback requirement.

However, there is a fundamental challenge to using Feeds for this use case. It assumes the feed source has a constant location; for example, the RSS feed URL for a blog is always the same. To support an archival migration, it is likely that you will need to retrieve the content from multiple locations or process it in batches.

Handling Multiple Importer Configurations

The general issue is that the feed importer needed for historical imports may need to be configured differently than one for ongoing imports. One basic difference may be the feed URL. The archival feed source may need to specify ranges of content by page or date or it may be an export file rather than a URL. Another difference may be the format, where a different feed parser is needed for the historical import.

To handle multiple configurations, you could either use one feed importer and change its configuration when switching from the historical import to the ongoing import. The other option is to have two feed importers defined, one for the historical import and one for the ongoing import.

Since it’s important to be able to easily run both imports at any time in order to catch potential new feed items, having to switch feed configuration could be problematic. Instead, I’d prefer to set up one feed importer, say for the ongoing import, and then clone it and tweak the configuration as needed for the historical import.

There’s one issue with this approach: Feeds avoids importing duplicate content within the context of a feed importer configuration. Therefore, duplicate content may be created as items from the ongoing feed end up in the archival feed.

A simple solution is to extend the standard node processor plugin and override one function to ensure that imported content items are unique in the context of the target content type. This plugin is available in a drupal.org sandbox project module by Steven Jones or can be implemented by including the following plugin in a custom module:

 

  1. <?php
  2.  
  3. /**
  4.  * Class definition for FeedsUniqueNodeProcessor.
  5.  *
  6.  * Allows checking for uniqueness among all nodes in a given content type,
  7.  * rather than only nodes imported with a given feeds importer.
  8.  *
  9.  * Based on: https://drupal.org/sandbox/darthsteven/1444686
  10.  */
  11.  
  12. class FeedsUniqueNodeProcessor extends FeedsNodeProcessor {
  13.  
  14.   /**
  15.    * Retrieve the target entity's existing id if available. Otherwise return 0.
  16.    *
  17.    * @ingroup mappingapi
  18.    *
  19.    * @param FeedsSource $source
  20.    *   The source information about this import.
  21.    * @param $result
  22.    *   A FeedsParserResult object.
  23.    *
  24.    * @return
  25.    *   The serial id of an entity if found, 0 otherwise.
  26.    */
  27.   protected function existingEntityId(FeedsSource $source, FeedsParserResult $result) {
  28.     $query = db_select('feeds_item')
  29.      ->fields('feeds_item', array('entity_id'))
  30.      ->condition('entity_type', $this->entityType());
  31.  
  32.     // Iterate through all unique targets and test whether they do already
  33.     // exist in the database.
  34.     foreach ($this->uniqueTargets($source, $result) as $target => $value) {
  35.       switch ($target) {
  36.         case 'url':
  37.           $entity_id = $query->condition('url', $value)->execute()->fetchField();
  38.          break;
  39.         case 'guid':
  40.          $entity_id = $query->condition('guid', $value)->execute()->fetchField();
  41.          break;
  42.       }
  43.      if (isset($entity_id)) {
  44.         // Return with the content id found.
  45.        return $entity_id;
  46.       }
  47.     }
  48.  
  49.     if ($nid = parent::existingEntityId($source, $result)) {
  50.       return $nid;
  51.     }
  52.  
  53.     return 0;
  54.   }
  55. }

Once this plugin is available to Drupal through a module, both the ongoing and archival feed importers should be configured to use this feed processor.

Importing Historical Feed Items

Most RSS feeds are limited to a certain number of items, so that as new content is published, the old content “falls off” the feed. Some RSS feeds allow providing a URL parameter for the page of results or date range to retrieve. For example, Wordpress blogs allow using the “paged” URL parameter to specify the page of results to return.

If an RSS feed supports this, then all historical content could be imported by incrementally fetching and processing consecutive batches of content.

As mentioned earlier, Feeds assumes that the source for a feed is always the same, so this approach requires code to manage the feed source and iterate through the complete range of content. Therefore, rather than running automatically at some interval (as is appropriate for an ongoing feed importer), the historical feed importer should not be set to run automatically.

The following is a sample of code for managing this incremental import. The approach is:

  • Define a cron queue worker that configures the archival feed importer to the URL for a given page of results and then runs it.

  • On cron, start a queue worker from page 1 if it isn’t already started.

  • Use hook_feeds_after_import() to detect when the archival feed importer finishes so that the next page can be added to the queue.

  • Define a helper function that does a test fetch of the next page of feed content so that we can gracefully end the import process if the last page was reached.

 

  1. /**
  2.  * Implements hook_cron().
  3.  */
  4. function example_feeds_cron() {
  5. // Enqueue the example_feeds_blog_archive job on page 1, if it has not been
  6.  // started and the job queue is empty.
  7.  if (variable_get('example_feeds_blog_archive_page', -1) < 0) {
  8.     $queue = DrupalQueue::get('example_feeds_blog_archive');
  9.    if (!$queue->numberOfItems()) {
  10.       example_feeds_blog_archive_next(1);
  11.    }
  12.  }
  13. }
  14.  
  15. /**
  16.  * Implements hook_cron_queue_info().
  17.  *
  18.  * Define cron queue jobs for this module.
  19.  */
  20. function example_feeds_cron_queue_info() {
  21.   $queues = array();
  22. $queues['example_feeds_blog_archive'] = array(
  23.     'worker callback' => 'example_feeds_blog_archive_get',
  24.    'time' => 60,
  25.  );
  26.   return $queues;
  27. }
  28.  
  29. /**
  30.  * Implements hook_feeds_after_import().
  31.  *
  32.  * Handles the post-import event for feeds importers defined by this module.
  33.  */
  34. function example_feeds_feeds_after_import($feed) {
  35. // When the example_archive feeds importer finishes, queue the
  36.  // importer on the next page.
  37.  if ($feed->importer()->id === 'example_archive') {
  38.     example_feeds_blog_archive_next();
  39.  }
  40. }
  41.  
  42. /**
  43.  * Enqueue the next page (or the specified page) for the blog archive feed
  44.  * importer. Check that the feed page exists first.
  45.  */
  46. function example_feeds_blog_archive_next($page = -1) {
  47.   if ($page < 0) {
  48.     $page = variable_get('example_feeds_blog_archive_page', -1) + 1;
  49.   }
  50.  
  51.  $source_url = 'http://example.com/feed/?paged=' . intval($page);
  52.  feeds_include_library('http_request.inc', 'http_request');
  53.  $result = http_request_get($source_url, NULL, NULL, NULL, 5);
  54.  if (in_array($result->code, array(200, 201, 202, 203, 204, 205, 206))) {
  55.    // If the request was successful, then queue the job to import the feed.
  56.    $queue = DrupalQueue::get('example_feeds_blog_archive');
  57.    $queue->createItem(array($page));
  58.    variable_set('example_feeds_blog_archive_page', $page);
  59.    return $page;
  60.  }
  61.  drupal_set_message(t('Cannot fetch page %page-num of the blog archive feed, so ending import.', array('%page-num' => $page)));
  62.  return FALSE;
  63. }
  64.  
  65. /**
  66.  * Configure the feed importer to fetch the given page of the blog feed.
  67.  */
  68. function example_feeds_blog_archive_get($info) {
  69.   if (!empty($info[0]) && $page = intval($info[0])) {
  70.     $source_url = 'http://example.com/feed/?paged=' . intval($info[0]);
  71.    $source = feeds_source('example_archive');
  72.    $source_config = array('FeedsHTTPFetcher' => array('source' => $source_url));
  73.    $source->addConfig($source_config);
  74.    $source->save();
  75.    $source->startImport();
  76.  }
  77. }

With this code to support the archival import, both the archival and ongoing feed importers can be enabled and ready to run. The archival import will run once to pick up historical content, and the ongoing import will run regularly to pick up new items in the feed.

Providing Administrative Control

To round off this solution, it’s important to provide a few minimal administrative controls and status information.

By using the cron queue worker, the importer will process as many pages as it can during a cron run, and is able to pick up where it leaves off. This allows handling a large number of pages of archival content without worrying about memory or execution time limits. However, it means the number of cron runs it takes to complete the import may vary according from run to run. Therefore, the person managing the import will likely want to see how many pages have been processed.

What if the importer process has a glitch and stops midway through a run? What if the feed’s source site has an outage causing the import to fail? For these reasons, it’s also crucial to have the ability to “nudge” the importer to try again from where it left off.

In some cases, you may want to re-run the entire import process. Maybe the ongoing importer did not run frequently enough and missed new content items before they “fell off” the first page of the feed. For these reasons, we need to provide a way to restart the importer as needed.

The following sample code alters the Feeds import page for the archival importer to add status information on the last page fetched and administrative operations for retrying the next page and restarting the whole import.

 

  1. /**
  2.  * Implements hook_form_FORM_ID_alter().
  3.  *
  4.  * Handles alterations to the feeds_import_form for importers defined by this
  5.  * module.
  6.  */
  7. function example_feeds_form_feeds_import_form_alter(&$form, &$form_state, $form_id) {
  8.   if ($form['#importer_id'] === 'example_archive') {
  9.     $form['source_page_status'] = array(
  10.     '#type' => 'fieldset',
  11.      '#title' => t('Incremental Fetch Status'),
  12.      '#weight' => -1,
  13.    );
  14.    $form['source_page_status']['page_num'] = array(
  15.      '#type' => 'item',
  16.      '#title' => t('Last page fetched'),
  17.      '#markup' => variable_get('example_feeds_blog_archive_page', t('Not set')),
  18.    );
  19.    $form['source_page_status']['page_retry'] = array(
  20.       '#type' => 'submit',
  21.      '#name' => 'page_retry',
  22.      '#value' => t('Retry next page of import'),
  23.      '#submit' => array('example_feeds_form_feeds_import_form_blog_archive_submit'),
  24.    );
  25.    $form['source_page_status']['page_restart'] = array(
  26.      '#type' => 'submit',
  27.      '#name' => 'page_restart',
  28.      '#value' => t('Restart import'),
  29.      '#submit' => array('example_feeds_form_feeds_import_form_blog_archive_submit'),
  30.    );
  31.   }
  32. }
  33.  
  34. /**
  35.  * Form submit handler for custom operations for the blog archive import form.
  36.  */
  37. function example_feeds_form_feeds_import_form_blog_archive_submit($form, $form_state) {
  38.   if (!empty($form_state['clicked_button']['#name'])) {
  39.    if ($form_state['clicked_button']['#name'] === 'page_retry') {
  40.      if ($page = example_feeds_blog_archive_next()) {
  41.        drupal_set_message(t('The blog archive job will process page %page-num on the next cron run.', array('%page-num' => $page)));
  42.      }
  43.    }
  44.    else if ($form_state['clicked_button']['#name'] === 'page_restart') {
  45.      variable_del('example_feeds_blog_archive_page');
  46.      $queue = DrupalQueue::get('example_feeds_blog_archive');
  47.      $queue->deleteQueue();
  48.      drupal_set_message(t('The blog archive job will restart on the next cron run.'));
  49.    }
  50.  }
  51. }

With these administrative controls, the combination of an archival and ongoing feed importer can successfully provide a robust and unified approach to migrating existing content as well as catching newly published content.

Putting It All Together

The Feeds contrib modules provides a flexible base for a variety of tasks based on fetching and processing content from external sources. This article illustrates an approach for using Feeds to support an incremental import of archival content. This approach may be useful if you are planning a migration from or integration with another CMS.

For more on these topics, see Exposing External Content to Drupal's Search, Planning Your Content Migration, and the sketches and talk linked from Sketching a Successful Drupal Migration.

Joe Turgeon