Source code for sussex_nltk.corpus_readers

'''
The corpus readers module provides access to corpora not included in the
standard NLTK distribution. The corpora have been prepared by the Text Analytics
Group at the University of Sussex.
'''


import os
import sys
import multiprocessing as mp
import re
import platform
from itertools import chain
import gzip
import random
from string import punctuation 

import nltk
from nltk.corpus.reader.api import CorpusReader
from nltk.tokenize import word_tokenize, sent_tokenize
try:
    import cPickle as pickle
except NameError:
    import pickle

import sussex_nltk as susx


[docs]def get_srl_sent(srl_file): sent = [] line = srl_file.readline() while line != '\n': sent.append( line ) line = srl_file.readline() return ''.join(sent)
[docs]def pre_process_file(paths): in_path, out_path = paths print in_path, out_path dir = os.path.dirname(out_path) if not os.path.exists(dir): os.makedirs(dir) print "%s writing %s..." % (mp.current_process().name, out_path) output_file = open(out_path, 'w') for review in reviews_from_file( in_path ): #print "pre processing %s" % review.unique_id review.tokenise_segment() #print str(review) output_file.write(str(review)) print "closing %s..." % out_path output_file.close()
def _sample_generator(gen, samplesize): sample = [] # Fill in the first samplesize elements: try: for _ in xrange(samplesize): sample.append( gen.next() ) except StopIteration: raise ValueError("Sample larger than population.") random.shuffle(sample) # Randomize their positions for i, sent in enumerate(gen, start=samplesize): r = random.randint(0, i) if r < samplesize: sample[r] = sent # at a decreasing rate, replace random items return sample
[docs]def reviews_from_file(fileid,count=[0],start=None,end=None): file = open(fileid, 'Ur') data = {} for line in file: if line.startswith('</'): node = line.rstrip('>\n ').lstrip('</ ' ) if node == 'review': if start is None or count[0]>=start: review = AmazonReview(data) yield review data = {} count[0] += 1 if end is not None and count[0] == end: break elif line.startswith('<'): node = line.rstrip('>\n ').lstrip('< ') elif start is None or count[0]>=start: try : data[node] += line except KeyError: data[node] = line file.close() raise StopIteration()
[docs]class CompressedCorpusReader(CorpusReader): """A corpus reader for accessing corpora in a gzip format. """ def __init__(self, fileids = r'.*\.txt', data_folder=''): _root = os.path.join(susx._sussex_root, data_folder) CorpusReader.__init__(self, _root, fileids) self._n = None self._n_sents = None
[docs] def raw(self, fileids=None): """Returns a generator object over the raw documents in the corpus. The documents are returned as a raw text string in the order they are stored in the corpus file. *fileids* is an optional list of file ids that can be used to filter down the corpus files where the list of strings is generated from. """ fileids = fileids if fileids is not None else self._fileids doc_count = 0 for fileid in self.abspaths(fileids): corpus = gzip.open(fileid, 'r') doc = [] for line in corpus: if line.strip() == '====': doc_count += 1 yield ''.join(doc) doc = [] else: doc.append(line) corpus.close() return
[docs] def words(self, fileids=None): """Returns a generator of the tokens in the corpus. The generator iterates over all the sentences in the corpus in order such that the documents in the corpus are iterated over in an ordered sequence. The order is determined by the order the documents are returned from the file system. Document boundaries are not marked in the generator. The produced list is a flat list of strings. *fileids* is an optional list of file ids that can be used to filter down the corpus files where the list of strings is generated from. """ fileids = fileids if fileids is not None else self._fileids for doc in self.raw(fileids): for sent in sent_tokenize(doc): for word in word_tokenize(sent): yield word
[docs] def sents(self, fileids=None): """A generator over the sentences in the corpus. The generator iterates over all the sentences in the corpus in order such that the documents in the corpus are iterated over in an ordered sequence. The order is determined by the order the documents are returned from the file system. Document boundaries are not marked in the generator. The produced list is a list of lists of strings. *fileids* is an optional list of file ids that can be used to filter down the corpus files where the list of strings is generated from. """ fileids = fileids if fileids is not None else self._fileids for doc in self.raw(fileids): for sent in sent_tokenize(doc): yield word_tokenize(sent) return
[docs] def enumerate_sents(self): """"Returns the number of sentences in the corpus """ if self._n_sents is not None: return self._n_sents self._n_sents = 0 for _ in self.sents(): self._n_sents += 1 return self._n_sents
[docs] def enumerate(self): """Returns the number of documents in the corpus. """ if self._n is not None: return self._n self._n = 0 for _ in self.raw(): self._n += 1 return self._n
[docs] def sample_words(self, candno, samplesize=2000): """Returns a random sample of words the corpus. The sample is generated by selecting *samplesize* documents from the corpus and flattening these documents into a list of strings. *candno* is used as the seed to a random number generator to ensure unique samples from the corpus. *samplesize* is the number of documents that should be sampled from the corpus. The method will raise a ``ValueError`` is *samplesize* or larger than the population size. """ random.seed(candno) sampled_docs = _sample_generator(self.raw(), samplesize) result = [] for doc in sampled_docs: for sent in sent_tokenize(doc): for word in word_tokenize(sent): result.append(word) return result
[docs]class TwitterCorpusReader(CompressedCorpusReader): """Provides access to tweets about teamGB collected during the London 2012 olympics. The corpus spans a roughly 24 hour period between 7th - 8th of August. """ def __init__(self, fileids = r'.*\.gz'): CompressedCorpusReader.__init__(self, fileids, 'data/twitter') self._n = None self._doc_count = None
[docs]class MedlineCorpusReader(CompressedCorpusReader): """The Medline corpus reader provides access to abstracts of medical research papers. """ def __init__(self, fileids = r'.*\.gz'): CompressedCorpusReader.__init__(self, fileids, 'data/medline') self._n = None self._doc_count = None
[docs]class ReutersCorpusReader(CompressedCorpusReader): """The ReutersCorpusReader provides access to a subset of the RCV1 corpus. The categories provided by the reader are ``'finance'`` and ``'sport'``. The documents are stored in a raw format, ie. they are not sentence segmented or POS tagged. `Link RCV1 corpus <http://about.reuters.com/researchandstandards/corpus/>` """ def __init__(self, fileids = r'.*\.gz'): CompressedCorpusReader.__init__(self, fileids, 'data/reuters') self._n = None self._doc_count = None
[docs] def category(self, cat): """Returns a new ReutersCorpusReader over the specified category. *cat* should be either ``'finance'`` or ``'sport'``. """ if not cat: return self return self._reader([cat])
[docs] def finance(self): """Returns a ReutersCorpusReader restricted to the ``'finance'`` category. """ return self._reader(domains=['finance'])
[docs] def sport(self): """Returns a ReutersCorpusReader restricted to the ``'sport'`` category. """ return self._reader(domains=['sport'])
def _reader(self, domains): polarised_fileids = [] for domain in domains: for fileid in self._fileids: if domain in fileid: polarised_fileids.append(fileid) reader = ReutersCorpusReader(fileids=polarised_fileids) return reader
[docs]class WSJCorpusReader(CorpusReader): """The WSJCorpusReader provides access to a subsample of the Penn Treebank. `Link Penn Treebank <http://www.cis.upenn.edu/~treebank/>` """ def __init__(self, fileids = r'.*\.mrg'): _root = os.path.join(susx._sussex_root, 'data/penn_treebank_npbrac_stanforddeps') CorpusReader.__init__(self, _root, fileids) self._n = None
[docs] def raw(self, fileids=None): """Returns a generator object over the raw documents in the corpus. The documents are returned as a raw text string in the order they are stored in the corpus file. All markup the documents may contain is removed. *fileids* is an optional list of file ids that can be used to filter down the corpus files where the list of strings is generated from. """ fileids = fileids if fileids is not None else self._fileids for fileid in self.abspaths(fileids): with open(fileid, 'r') as doc: doc_text = [] for i,line in enumerate(doc): if len(line.strip()) != 0: token = line.split('\t')[1] if i > 0 and token not in punctuation: token = ' %s'%token doc_text.append(token) elif len(doc_text) > 0: yield ''.join(doc_text) doc_text = [] raise StopIteration()
[docs] def words(self, fileids=None): """Returns a flat list of the words in the corpus. """ fileids = fileids if fileids is not None else self._fileids for fileid in self.abspaths(fileids): with open(fileid, 'r') as doc: for line in doc: if len(line.strip()) != 0: word,_,_ = line.partition('\t') yield word return
[docs] def tagged_words(self, fileids=None): """Returns a flat list of the tagged words of the corpus. The method returns a list of tuples where each tuple is a (word,pos_tag) pair. """ fileids = fileids if fileids is not None else self._fileids _words = [] for fileid in self.abspaths(fileids): with open(fileid, 'r') as doc: for line in doc: if len(line.strip()) != 0: token,_,pos_tag = line.partition('\t') yield (token, pos_tag) return
[docs] def sents(self, fileids=None): """Returns a list of list of strings representing each sentence in the corpus. The method loads each document in the corpus in turn, sentence and word tokenizes them and returns a [[str,str,str],[str,str,str]] where each sublist is a sentence and each str is a token in that sentence. """ fileids = fileids if fileids is not None else self._fileids _sents = [] for fileid in self.abspaths(fileids): with open(fileid, 'r') as doc: sent = [] for line in doc: if len(line.strip()) != 0: sent.append('%s '%line.split('\t')[1]) else: yield sent return
[docs] def tagged_sents(self, fileids=None): """Returns a list of list of tuples representing each sentence in the corpus. The method loads each document in the corpus in turn, sentence and word tokenizes them and returns a [[(str,str),(str,str)]] where each sublist is a sentence and each (str,str) tuple is a (token,pos_tag) pair in that sentence. """ fileids = fileids if fileids is not None else self._fileids _sents = [] for fileid in self.abspaths(fileids): with open(fileid, 'r') as doc: sent = [] for line in doc: if len(line.strip()) != 0: parts = line.split('\t') sent.append((parts[1], parts[2])) else: yield sent return
[docs] def enumerate(self): """Returns the number of documents in the corpus. """ if self._n is not None: return self._n self._n = len(self.fileids()) return self._n
[docs] def sample_words(self, candno, samplesize=2000): """Returns a random sample of words the corpus. The sample is generated by selecting *samplesize* documents from the corpus and flattening these documents into a list of strings. *candno* is used as the seed to a random number generator to ensure unique samples from the corpus. *samplesize* is the number of documents that should be sampled from the corpus. The method will raise a ``ValueError`` is *samplesize* or larger than the population size. """ random.seed(candno) sampled_docs = _sample_generator(self.raw(), samplesize) result = [] for doc in sampled_docs: for sent in sent_tokenize(doc): for word in word_tokenize(sent): result.append(word) return result
[docs]class AmazonReviewCorpusReader(CorpusReader): """The reader provides access to user written product reviews on amazon.com. The corpus is categorised into ``'dvd','book','kitchen'`` and ``'electronics'`` and each category is further divided into three sentiment classes ``'positive','negative'`` and ``'neutral'``. Each category contains 1000 reviews for the ``'positive'`` and ``'negative'`` sentiment classes. """ def __init__(self, fileids = r'.*\.review'): _root = os.path.join(susx._sussex_root, 'data/amazon_customer_reviews') CorpusReader.__init__(self, _root, fileids) self._n = None
[docs] def category(self, cat): """Returns a new AmazonReviewCorpusReader over the specified category. *cat* should be one of ``'kitchen','dvd','book','electronics'``. """ if not cat: return self return self._reviews([cat], ['negative', 'positive', 'unlabeled'])
[docs] def negative(self, domains = ['books', 'dvd', 'electronics', 'kitchen']): """Returns a new AmazonReviewCorpusReader over the negative reviews. *domains* should be a list of categories. """ return self._reviews(domains, ['negative'])
[docs] def positive(self, domains = ['books', 'dvd', 'electronics', 'kitchen']): """Returns a new AmazonReviewCorpusReader over the positive reviews. *domains* should be a list of categories. """ return self._reviews(domains, ['positive'])
[docs] def unlabeled(self, domains = ['books', 'dvd', 'electronics', 'kitchen']): """Returns a new AmazonReviewCorpusReader over the unlabeled reviews. *domains* should be a list of categories. """ return self._reviews(domains, ['unlabeled'])
def _reviews(self, domains, polarities): polarised_fileids = [] for domain in domains: for polarity in polarities: for fileid in self._fileids: if fileid.startswith(domain) and fileid.endswith(polarity+".review"): polarised_fileids.append(fileid) #return self.reviews(self.abspath(fileid)) return AmazonReviewCorpusReader(self.root, polarised_fileids)
[docs] def words(self): """Generator to return all words as a flat list. """ for fileid in self.abspaths(self._fileids): for review in reviews_from_file(fileid): for word in review.words(): yield word return
[docs] def sents(self): """Generator to return all sentences as a list of list of strings. """ for fileid in self.abspaths(self._fileids): for review in reviews_from_file(fileid): for sent in review.sents(): yield sent
[docs] def raw(self): """Generator to return the raw text of the reviews. """ for fileid in self.abspaths(self._fileids): for review in reviews_from_file(fileid): yield review.raw() return
[docs] def pre_process_corpus(self, output_dir, replace_self=False): n_cpu = mp.cpu_count() print "%d cores detected, using all of them." % (n_cpu) #construct arguments in tuples for mapped functions inputs = [(self.abspath(fileid), os.path.join(output_dir, fileid)) for fileid in self._fileids] pool = mp.Pool(processes=n_cpu) pool.map(pre_process_file, inputs, chunksize=n_cpu) pool.close() pool.join() print 'pre-processing complete' if replace_self: tmp = AmazonReviewCorpusReader(output_dir, self._fileids) self.__dict__ = tmp.__dict__
[docs] def attach_srl_data(self, srl_path, output_dir, replace_self=False): srl_file = open(srl_path, 'r') for fileid in self._fileids: path = os.path.join(output_dir, fileid) dir = os.path.dirname(path) if not os.path.exists(dir): os.makedirs(dir) print "writing %s..." % path output_file = open(path, 'w') for review in reviews_from_file( self.abspath(fileid) ): print "pre processing %s: %s" % (path, review.unique_id) #print review.n_sents #review.srl = '' #for i in range(review.n_sents): # review.srl += get_srl_sent(srl_file) review.srl = "\n".join( [get_srl_sent(srl_file) for i in range(review.n_sents)] ) #print review.srl output_file.write( str( review ) ) print "closing %s..." % path output_file.close() srl_file.close() print 'pre-processing complete' if replace_self: tmp = AmazonReviewCorpusReader(output_dir, self._fileids) self.__dict__ = tmp.__dict__
[docs] def enumerate(self): """Returns the number of review documents in the corpus. """ if self._n is not None: return self._n self._n = 0 for r in self.documents(): self._n += 1 return self._n # def raw_documents(self,start=None,end=None): # count = [0] # for fileid in self._fileids: # if end is not None and count[0] >= end : # break # for review in reviews_from_file(self.abspath(fileid), count, start, end): # yield review.raw()
[docs] def documents(self,start=None,end=None): """Generator over the documents in the corpus. returns AmazonReview objects. """ count = [0] for fileid in self._fileids: if not count[0] % 1000 and count[0]: #print "[%d]" % (count[0]) pass if end is not None and count[0] >= end : break for review in reviews_from_file(self.abspath(fileid), count, start, end): yield review
[docs] def sample_words(self, candno, samplesize=2000): """Returns a random sample of words the corpus. The sample is generated by selecting *samplesize* documents from the corpus and flattening these documents into a list of strings. *candno* is used as the seed to a random number generator to ensure unique samples from the corpus. *samplesize* is the number of documents that should be sampled from the corpus. The method will raise a ``ValueError`` is *samplesize* or larger than the population size. """ random.seed(candno) sampled_docs = _sample_generator(self.raw(), samplesize) result = [] for doc in sampled_docs: for sent in sent_tokenize(doc): for word in word_tokenize(sent): result.append(word) return result
[docs]class AmazonReview(object): def __init__(self, data): self._sents = {} self._data = {} self._data['unique_id'] = data['unique_id'] self._data['asin'] = data['asin'] self._data['product_name'] = data['product_name'] self._data['product_type'] = data['product_type'] self._data['helpful'] = data['helpful'] self._data['rating'] = float(data['rating']) self._data['title'] = data['title'] self._data['date'] = data['date'] self._data['reviewer'] = data['reviewer'] self._data['reviewer_location'] = data['reviewer_location'] self._data['review_text'] = data['review_text'] self._data['review_text_tokenised_segmented'] = '' if 'review_text_tokenised_segmented' not in data else data['review_text_tokenised_segmented'] self._data['srl'] = '' if 'srl' not in data else data['srl'] self._data['n_sents'] = '' if 'n_sents' not in data else int(data['n_sents']) #print self.__dict__ def __str__(self): """ output format is sensitive to newlines and whitespace. strings have newlines included, numbers do not. """ str = """<review> <unique_id> %(unique_id)s</unique_id> <asin> %(asin)s</asin> <product_name> %(product_name)s</product_name> <product_type> %(product_type)s</product_type> <helpful> %(helpful)s</helpful> <rating> %(rating)s </rating> <title> %(title)s</title> <date> %(date)s</date> <reviewer> %(reviewer)s</reviewer> <reviewer_location> %(reviewer_location)s</reviewer_location> <review_text> %(review_text)s</review_text> <review_text_tokenised_segmented> %(review_text_tokenised_segmented)s </review_text_tokenised_segmented> <n_sents> %(n_sents)s </n_sents> <srl> %(srl)s</srl> </review> """ % (self._data) return str
[docs] def rating(self): return self._data['rating']
[docs] def format_sentences_string(self, word_limit = 70): if not self._sents: self.tokenise_segment(word_limit) one_sent_per_line = "\n".join((" ".join(sent) for sent in self._sents)) if one_sent_per_line: #print sent_per_line self._data['n_sents'] = one_sent_per_line.count('\n') + 1 else: self._data['n_sents'] = 0 #print self.n_sents self._data['review_text_tokenised_segmented'] = one_sent_per_line
[docs] def tagged_sents(self): srl = self._data['srl'] sents = srl.split("\n\n") tagged_sents = [] for sent in sents: tokens = sent.split("\n") #print tokens[0].split("\t") tagged_sent = [(token.split("\t")[1], token.split("\t")[4]) for token in tokens if token] tagged_sents.append(tagged_sent) return tagged_sents
[docs] def tokenise_segment(self, word_limit = 0): self._sents = [sent for sent in (nltk.word_tokenize(sent) for sent in nltk.sent_tokenize(self._data['review_text'])) if not word_limit or len(sent) <= word_limit]
[docs] def raw(self): return self._data['review_text']
[docs] def words(self): if not self._sents: self.tokenise_segment() words = [word for word in chain(*self._sents)] return words
[docs] def sents(self): if not self._sents: self.tokenise_segment() return self._sents
[docs]def elapsed(start): return time() - start
if __name__ == '__main__': susx._set_root('/usr/local/scratch/LanguageEngineering/') from sussex_nltk.funcs import * import random from nltk.text import Text from time import time reuters = ReutersCorpusReader() start = time() print 'Reuters:', reuters.enumerate(), 'sents:', len(list(reuters.sents())), '(%1.2f s)'%(elapsed(start)), 'words:', len(list(reuters.words())), '(%1.2f s)'%(elapsed(start)) start = time() print 'Reuters (finance):', reuters.finance().enumerate(), 'sents:', len(list(reuters.finance().sents())), '(%1.2f s)'%(elapsed(start)), 'words:', len(list(reuters.finance().words())), '(%1.2f s)'%(elapsed(start)) start = time() print 'Reuters (sport):', reuters.sport().enumerate(), 'sents:', len(list(reuters.sport().sents())), '(%1.2f s)'%(elapsed(start)), 'words:', len(list(reuters.sport().words())), '(%1.2f s)'%(elapsed(start)) medline = MedlineCorpusReader() start = time() print 'Medline:', medline.enumerate(), 'sents:', len(list(medline.sents())), '(%1.2f s)'%(elapsed(start)), 'words:', len(list(medline.words())), '(%1.2f s)'%(elapsed(start)) wsj = WSJCorpusReader() start = time() print 'WSJ:', wsj.enumerate(), 'sents:', len(list(wsj.sents())), '(%1.2f s)'%(elapsed(start)), 'words:', len(list(wsj.words())), '(%1.2f s)'%(elapsed(start)) twitter = TwitterCorpusReader() start = time() print 'Twitter:', twitter.enumerate(), 'sents:', len(list(twitter.sents())), '(%1.2f s)'%(elapsed(start)), 'words:', len(list(twitter.words())), '(%1.2f s)'%(elapsed(start)) amazon = AmazonReviewCorpusReader() print 'Amazon:', amazon.enumerate(), 'sents:', len(list(amazon.sents())), '(%1.2f s)'%(elapsed(start)), 'words:', len(list(amazon.words())), '(%1.2f s)'%(elapsed(start))