import feedparser
import httplib
import sha
import base64
import time, re
HTTP_DEBUG_LEVEL = 0
class BlogParams:
""" Encapsulates the main blog parameters such as feed URL, user name and password"""
def __init__(self, feed_url=None, user=None, password=None):
self.FEED_URL = feed_url
if self.FEED_URL is None:
self.FEED_URL = "YOUR_FEED_URL_HERE"
self.USER = user
if self.USER is None:
self.USER = "YOUR_BLOGGER_USERNAME_HERE"
self.PASSWORD = password
if self.PASSWORD is None:
self.PASSWORD = "YOUR_BLOGGER_PASSWORD_HERE"
class Entry:
""" Code for the Entry class inspired from http://jonasgalvez.com"""
def __init__(self, data):
self.title = data.title
self.date = time.mktime(data.created_parsed)
self.url = data.links[0].href
self.content = data.content
def __cmp__(self, other):
return other.date - self.date
class Blogger:
def __init__(self, blog_params):
self._feed_url = blog_params.FEED_URL
self._user = blog_params.USER
self._password = blog_params.PASSWORD
self._http_debug_level = HTTP_DEBUG_LEVEL
self.refresh_feed()
def refresh_feed(self):
self._data = feedparser.parse(self._feed_url)
self._feed = self._data.feed
self._entries = []
for i in range(len(self._data.entries)):
entry = self._data.entries[i]
self._entries.append(Entry(entry))
self._entries.sort()
def snooze(self, seconds=None):
if seconds is None:
seconds = 3
time.sleep(seconds)
def get_feed_posting_url(self):
return self._feed.links[0].href
def get_feed_posting_host(self):
posting_host = None
host_url = self._feed.generator_detail.url
s = re.search(r'\/\/(.*)\/', host_url)
if s:
posting_host = s.group(1)
return posting_host
def get_title(self):
return self._feed.title
def get_tagline(self):
return self._feed.tagline
def get_num_entries(self):
#return len(self._entries)
return len(self._data.entries)
def get_nth_entry(self, n):
return self._entries[n-1]
def get_nth_entry_title(self, n):
try:
return self._entries[n-1].title
except:
return None
def get_nth_entry_content(self, n):
try:
return self._entries[n-1].content[0].value
except:
return None
def get_nth_entry_content_strip_html(self, n):
content = None
try:
content = self._entries[n-1].content[0].value
content = re.sub(r'<\/?div>', '', content)
content = re.sub(r'
', '\n', content)
except:
return None
return content
def get_nth_entry_url(self, n):
try:
return self._entries[n-1].url
except:
return None
def get_nonce(self):
"""Code for this method inspired from http://www.daikini.com/source/atomexamples"""
private = "Blogger NONCE"
timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
return "%s %s" % (timestamp, sha.new("%s:%s" % (timestamp, private)).hexdigest())
def get_post_headers(self):
"""Code for this method inspired from http://www.daikini.com/source/atomexamples"""
nonce = self.get_nonce()
base64_encoded_nonce = base64.encodestring(nonce).replace("\n", "")
password_digest = base64.encodestring(sha.new(nonce + self._post_creation_time + self._password).digest()).replace("\n", "")
authorization_header = 'UsernameToken Username="%s", PasswordDigest="%s", Created="%s", Nonce="%s"' \
% (self._user, password_digest, self._post_creation_time, base64_encoded_nonce)
headers = {"Content-type": "application/atom+xml",
"Authorization": 'WSSE profile="UsernameToken"',
"X-WSSE": authorization_header,
"UserAgent": "Blogger Atom Posting"}
return headers
def post_new_entry(self, title, content):
"""Code for this method inspired from http://www.daikini.com/source/atomexamples"""
self._post_creation_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
headers = self.get_post_headers()
body = """
Blogger Python Generator
%s
%s
%s
""" \
% (title, self._post_creation_time, content)
try:
conn = httplib.HTTPConnection(self.get_feed_posting_host())
conn.set_debuglevel(self._http_debug_level)
conn.request("POST", self.get_feed_posting_url(), body, headers)
response = conn.getresponse()
#print
#print response.read()
conn.close()
except:
return False
self.snooze(6)
self.refresh_feed()
return True
def delete_entry_by_url(self, entry_url, refresh_feed=1):
"""Code for this method inspired from http://www.daikini.com/source/atomexamples"""
self._post_creation_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
headers = self.get_post_headers()
body = ""
try:
conn = httplib.HTTPConnection(self.get_feed_posting_host())
conn.set_debuglevel(self._http_debug_level)
conn.request("DELETE", entry_url, body, headers)
response = conn.getresponse()
conn.close()
except:
return False
self.snooze()
if refresh_feed:
self.refresh_feed()
return True
def delete_nth_entry(self, n):
entry_url = self.get_nth_entry_url(n)
return self.delete_entry_by_url(entry_url)
def delete_all_entries(self):
for entry in self._entries:
self.delete_entry_by_url(entry.url, refresh_feed=0)
self.refresh_feed()
the_blog = None
def get_blog():
global the_blog
if the_blog is None:
blog_params = BlogParams()
the_blog = Blogger(blog_params)
return the_blog
if __name__ == "__main__":
blogger = get_blog()
blogger.post_new_entry("Entry #1 Title", "Entry #1 Content\nLine 2")
print "Title:", blogger.get_nth_entry_title(1)
print "Content:", blogger.get_nth_entry_content(1)
print "Content stripped of HTML tags:\n", blogger.get_nth_entry_content_strip_html(1)
blogger.delete_all_entries()