Submit scraped items to Message Queue (amqp)


/ Published in: Python
Save to your folder(s)



Copy this code and paste it in your HTML
  1. # This pipeline enqueues scraped items to a message queue. It depends on the [carrot](http://ask.github.com/carrot/genindex.html) library. [Gist](https://gist.github.com/1574231)
  2.  
  3. from scrapy.xlib.pydispatch import dispatcher
  4. from scrapy import signals
  5. from scrapy.exceptions import DropItem
  6. from scrapy.utils.serialize import ScrapyJSONEncoder
  7.  
  8. from carrot.connection import BrokerConnection
  9. from carrot.messaging import Publisher
  10.  
  11. from twisted.internet.threads import deferToThread
  12.  
  13. class MessageQueuePipeline(object):
  14. def __init__(self, host_name, port, userid, password, virtual_host, encoder_class):
  15. self.q_connection = BrokerConnection(hostname=host_name, port=port,
  16. userid=userid, password=password,
  17. virtual_host=virtual_host)
  18. self.encoder = encoder_class()
  19. dispatcher.connect(self.spider_opened, signals.spider_opened)
  20. dispatcher.connect(self.spider_closed, signals.spider_closed)
  21.  
  22. @classmethod
  23. def from_settings(cls, settings):
  24. host_name = settings.get('BROKER_HOST', 'localhost')
  25. port = settings.get('BROKER_PORT', 5672)
  26. userid = settings.get('BROKER_USERID', "guest")
  27. password = settings.get('BROKER_PASSWORD', "guest")
  28. virtual_host = settings.get('BROKER_VIRTUAL_HOST', "/")
  29. encoder_class = settings.get('MESSAGE_Q_SERIALIZER', ScrapyJSONEncoder)
  30. return cls(host_name, port, userid, password, virtual_host, encoder_class)
  31.  
  32. def spider_opened(self, spider):
  33. self.publisher = Publisher(connection=self.q_connection,
  34. exchange="", routing_key=spider.name)
  35.  
  36. def spider_closed(self, spider):
  37. self.publisher.close()
  38.  
  39. def process_item(self, item, spider):
  40. return deferToThread(self._process_item, item, spider)
  41.  
  42. def _process_item(self, item, spider):
  43. self.publisher.send({"scraped_data": self.encoder.encode(dict(item))})
  44. return item
  45.  
  46. # Snippet imported from snippets.scrapy.org (which no longer works)
  47. # author: zsquare
  48. # date : Jan 07, 2012
  49.  

Report this snippet


Comments

RSS Icon Subscribe to comments

You need to login to post a comment.