Posted By

scrapy on 09/01/12


Tagged

database Asynchronous twisted scrapy adbapi


Versions (?)

Async Twisted Db Pipeline


 / Published in: Python
 

  1. # This pipeline uses a shared database pool to conserve resources during asynchronous item processing.
  2.  
  3. import MySQLdb.cursors
  4. from twisted.enterprise import adbapi
  5.  
  6. class InventoryPipeline(object):
  7.  
  8. def __init__(self):
  9. """
  10. Connect to the database in the pool.
  11.  
  12. .. note:: hardcoded db settings
  13. """
  14. self.dbpool = adbapi.ConnectionPool('MySQLdb',
  15. db='database',
  16. user='user',
  17. passwd='password',
  18. cursorclass=MySQLdb.cursors.DictCursor,
  19. charset='utf8',
  20. use_unicode=True
  21. )
  22.  
  23. def process_item(self, spider, item):
  24. """
  25. Run db query in thread pool and call :func:`_conditional_insert`.
  26. We only want to process Items of type `InventoryItem`.
  27.  
  28. :param spider: The spider that created the Item
  29. :type spider: spider
  30. :param item: The Item to process
  31. :type item: Item
  32. :returns: Item
  33. """
  34. if isinstance(item, InventoryItem):
  35. query = self.dbpool.runInteraction(self._conditional_insert, item)
  36. query.addErrback(self._database_error, item)
  37.  
  38. return item
  39.  
  40. def _conditional_insert(self, tx, item):
  41. """
  42. Insert an entry in the `log` table and update the `seller` table,
  43. if neccissary, with the seller's name.
  44.  
  45. :param tx: Database cursor
  46. :type tx: MySQLdb.cursors.DictCursor
  47. :param item: The Item to process
  48. :type item: Item
  49. """
  50. tx.execute("SELECT id, name FROM seller WHERE id = %s", (item['seller_id']))
  51. result = tx.fetchone()
  52. if result:
  53. log.msg("Seller already in db: %d, %s, %s, %s" % (result['id'], item['seller_id'], item['seller_name'], result['name']), level=log.DEBUG)
  54. self.sid = result['id']
  55. if not item['seller_name']:
  56. item['seller_name'] = result['name']
  57. log.msg('Should update the name %s to %s, but not going to do it now.' % (result['name'], item['seller_name']), level=log.DEBUG)
  58. elif item['seller_name']:
  59. log.msg("Inserting into seller table: %s, %s" % (item['seller_id'], item['seller_name']), level=log.DEBUG)
  60. tx.execute(\
  61. "insert into seller (id, name, logged) "
  62. "values (%s, %s, %s)",
  63. (item['seller_id'],
  64. item['seller_name'],
  65. time.time(),
  66. ) )
  67.  
  68. # add item record in the db
  69. log.msg("Inserting item: %s" % item, level=log.DEBUG)
  70. tx.execute("""
  71. insert into item (
  72. `seller_idfk`, `batch_id`, `index`, `asin`, `title`, `quantity`, `cond`, `price`
  73. ) values
  74. ( %s, %s, %s, %s, %s, %s, %s, %s )
  75. """, (
  76. self.sid,
  77. item['batch_id'],
  78. item['index'],
  79. item['asin'],
  80. item['title'],
  81. item['qty'],
  82. item['cond'],
  83. item['price'],
  84. ) )
  85.  
  86. def _database_error(self, e, item):
  87. """
  88. Log an exception to the Scrapy log buffer.
  89. """
  90. print "Database error: ", e
  91.  
  92. # Snippet imported from snippets.scrapy.org (which no longer works)
  93. # author: stav
  94. # date : Nov 16, 2011
  95.  

Report this snippet  

You need to login to post a comment.