import feedparser import httplib import sha import base64 import sys, time, re HTTP_DEBUG_LEVEL = 0 SLEEP_INTERVAL = 7 try: bool = True except: True = 1 False = 0 class BlogParams: """ Encapsulates the main blog parameters such as feed URL, user name and password""" def __init__(self, feed_url=None, user=None, password=None): self.FEED_URL = feed_url if self.FEED_URL is None: self.FEED_URL = "http://fitnessetesting.blogspot.com/atom.xml" self.USER = user if self.USER is None: self.USER = "YOUR_BLOGGER_USERNAME_HERE" self.PASSWORD = password if self.PASSWORD is None: self.PASSWORD = "YOUR_BLOGGER_PASSWORD_HERE" class Entry: """ Code for the Entry class inspired from http://jonasgalvez.com""" def __init__(self, data): self.title = data.title self.date = time.mktime(data.created_parsed) self.url = data.links[0].href self.content = data.content def __cmp__(self, other): return other.date - self.date class Blogger: def __init__(self, blog_params): self._feed_url = blog_params.FEED_URL self._user = blog_params.USER self._password = blog_params.PASSWORD self._http_debug_level = HTTP_DEBUG_LEVEL self.refresh_feed() def refresh_feed(self): self._data = feedparser.parse(self._feed_url) self._feed = self._data.feed self._entries = [] for i in range(len(self._data.entries)): entry = self._data.entries[i] self._entries.append(Entry(entry)) self._entries.sort() def snooze(self, seconds=None): if seconds is None: seconds = SLEEP_INTERVAL time.sleep(seconds) def get_feed_posting_url(self): return self._feed.links[0].href def get_feed_posting_host(self): posting_host = None host_url = self._feed.generator_detail.url s = re.search(r'\/\/(.*)\/', host_url) if s: posting_host = s.group(1) return posting_host def get_title(self): return self._feed.title def get_tagline(self): return self._feed.tagline def get_num_entries(self): return len(self._entries) #return len(self._data.entries) def get_nth_entry(self, n): return self._entries[n-1] def get_nth_entry_title(self, n): try: return self._entries[n-1].title except: return None def get_nth_entry_content(self, n): try: return self._entries[n-1].content[0].value except: return None def get_nth_entry_content_strip_html(self, n): content = None try: content = self._entries[n-1].content[0].value content = re.sub(r'<\/?div>', '', content) content = re.sub(r'
', '\n', content) except: return None return content def get_nth_entry_url(self, n): try: return self._entries[n-1].url except: return None def get_nonce(self): """Code for this method inspired from http://www.daikini.com/source/atomexamples""" private = "Blogger NONCE" timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) return "%s %s" % (timestamp, sha.new("%s:%s" % (timestamp, private)).hexdigest()) def get_post_headers(self): """Code for this method inspired from http://www.daikini.com/source/atomexamples""" nonce = self.get_nonce() base64_encoded_nonce = base64.encodestring(nonce).replace("\n", "") password_digest = base64.encodestring(sha.new(nonce + self._post_creation_time + self._password).digest()).replace("\n", "") authorization_header = 'UsernameToken Username="%s", PasswordDigest="%s", Created="%s", Nonce="%s"' \ % (self._user, password_digest, self._post_creation_time, base64_encoded_nonce) headers = {"Content-type": "application/atom+xml", "Authorization": 'WSSE profile="UsernameToken"', "X-WSSE": authorization_header, "UserAgent": "Blogger Atom Posting"} return headers def post_new_entry(self, title, content): """ Code for this method inspired from http://www.daikini.com/source/atomexamples >>> blog = get_blog() >>> title = "Test title" >>> content = "Test content" >>> init_num_entries = blog.get_num_entries() >>> rc = blog.post_new_entry(title, content) >>> print rc True >>> num_entries = blog.get_num_entries() >>> num_entries == init_num_entries + 1 True """ self._post_creation_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) headers = self.get_post_headers() body = """ Blogger Python Generator %s %s %s """ \ % (title, self._post_creation_time, content) try: conn = httplib.HTTPConnection(self.get_feed_posting_host()) conn.set_debuglevel(self._http_debug_level) conn.request("POST", self.get_feed_posting_url(), body, headers) response = conn.getresponse() #print #print response.read() conn.close() except : print "HTTP POST error", sys.exc_info() return False self.snooze(10) self.refresh_feed() return True def delete_entry_by_url(self, entry_url, refresh_feed=1): """Code for this method inspired from http://www.daikini.com/source/atomexamples""" self._post_creation_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) headers = self.get_post_headers() body = "" try: conn = httplib.HTTPConnection(self.get_feed_posting_host()) conn.set_debuglevel(self._http_debug_level) conn.request("DELETE", entry_url, body, headers) response = conn.getresponse() conn.close() except: return False self.snooze() if refresh_feed: self.refresh_feed() return True def delete_nth_entry(self, n): entry_url = self.get_nth_entry_url(n) return self.delete_entry_by_url(entry_url) def delete_all_entries(self): """ >>> blog = get_blog() >>> blog.delete_all_entries() >>> print blog.get_num_entries() 0 """ for entry in self._entries: self.delete_entry_by_url(entry.url, refresh_feed=0) self.refresh_feed() the_blog = None def get_blog(): global the_blog if the_blog is None: blog_params = BlogParams() the_blog = Blogger(blog_params) return the_blog if __name__ == "__main__": import doctest doctest.testmod()