Posted By

scrapy on 09/01/12


Tagged

Carrot scrapy amqp message-queue


Versions (?)

Submit scraped items to Message Queue (amqp)


 / Published in: Python
 

  1. # This pipeline enqueues scraped items to a message queue. It depends on the [carrot](http://ask.github.com/carrot/genindex.html) library. [Gist](https://gist.github.com/1574231)
  2.  
  3. from scrapy.xlib.pydispatch import dispatcher
  4. from scrapy import signals
  5. from scrapy.exceptions import DropItem
  6. from scrapy.utils.serialize import ScrapyJSONEncoder
  7.  
  8. from carrot.connection import BrokerConnection
  9. from carrot.messaging import Publisher
  10.  
  11. from twisted.internet.threads import deferToThread
  12.  
  13. class MessageQueuePipeline(object):
  14. def __init__(self, host_name, port, userid, password, virtual_host, encoder_class):
  15. self.q_connection = BrokerConnection(hostname=host_name, port=port,
  16. userid=userid, password=password,
  17. virtual_host=virtual_host)
  18. self.encoder = encoder_class()
  19. dispatcher.connect(self.spider_opened, signals.spider_opened)
  20. dispatcher.connect(self.spider_closed, signals.spider_closed)
  21.  
  22. @classmethod
  23. def from_settings(cls, settings):
  24. host_name = settings.get('BROKER_HOST', 'localhost')
  25. port = settings.get('BROKER_PORT', 5672)
  26. userid = settings.get('BROKER_USERID', "guest")
  27. password = settings.get('BROKER_PASSWORD', "guest")
  28. virtual_host = settings.get('BROKER_VIRTUAL_HOST', "/")
  29. encoder_class = settings.get('MESSAGE_Q_SERIALIZER', ScrapyJSONEncoder)
  30. return cls(host_name, port, userid, password, virtual_host, encoder_class)
  31.  
  32. def spider_opened(self, spider):
  33. self.publisher = Publisher(connection=self.q_connection,
  34. exchange="", routing_key=spider.name)
  35.  
  36. def spider_closed(self, spider):
  37. self.publisher.close()
  38.  
  39. def process_item(self, item, spider):
  40. return deferToThread(self._process_item, item, spider)
  41.  
  42. def _process_item(self, item, spider):
  43. self.publisher.send({"scraped_data": self.encoder.encode(dict(item))})
  44. return item
  45.  
  46. # Snippet imported from snippets.scrapy.org (which no longer works)
  47. # author: zsquare
  48. # date : Jan 07, 2012
  49.  

Report this snippet  

You need to login to post a comment.