import collections
import functools
import logging
import os
import pickle # nosec
import random
from typing import Dict
from pycantonese._punctuation_marks import _PUNCTUATION_MARKS
from pycantonese.pos_tagging.hkcancor_to_ud import hkcancor_to_ud
# Use the highest pickle protocol version that's compatible for all supported
# Python versions.
# Protocol version 4 was added in Python 3.4.
# Protocol version 5 was added in Python 3.8.
# Reference: https://docs.python.org/3/library/pickle.html#data-stream-format
_PICKLE_PROTOCOL = 4
_THIS_DIR = os.path.dirname(os.path.abspath(__file__))
_PICKLE_PATH = os.path.join(_THIS_DIR, "tagger.pickle")
class _AveragedPerceptron:
"""An averaged perceptron.
This is a modified version based on the textblob-aptagger codebase
(MIT license), with original implementation by Matthew Honnibal:
https://github.com/sloria/textblob-aptagger/blob/266fa1c22daaff7c60577efa8577f1b6ce2f7f70/textblob_aptagger/_perceptron.py
"""
def __init__(self):
# Each feature (key) gets its own weight vector (value).
self.weights: Dict[str, Dict[str, float]] = {}
self.classes = set()
# The accumulated values, for the averaging. These will be keyed by
# feature/class tuples
self._totals = collections.defaultdict(int)
# The last time the feature was changed, for the averaging. Also
# keyed by feature/class tuples
# (tstamps is short for timestamps)
self._tstamps = collections.defaultdict(int)
# Number of instances seen
self.i = 0
def predict(self, features):
"""Return the best label for the given features.
It's computed based on the dot-product between the features and
current weights.
"""
scores = collections.defaultdict(float)
for feat, value in features.items():
if feat not in self.weights or value == 0:
continue
weights = self.weights[feat]
for label, weight in weights.items():
scores[label] += value * weight
# Do a secondary alphabetic sort, for stability
return max(self.classes, key=lambda label: (scores[label], label))
def update(self, truth, guess, features):
"""Update the feature weights."""
def upd_feat(c, f, w, v):
param = (f, c)
self._totals[param] += (self.i - self._tstamps[param]) * w
self._tstamps[param] = self.i
self.weights[f][c] = w + v
self.i += 1
if truth == guess:
return None
for f in features:
weights = self.weights.setdefault(f, {})
upd_feat(truth, f, weights.get(truth, 0.0), 1.0)
upd_feat(guess, f, weights.get(guess, 0.0), -1.0)
def average_weights(self):
"""Average weights from all iterations."""
for feat, weights in self.weights.items():
new_feat_weights = {}
for clas, weight in weights.items():
param = (feat, clas)
total = self._totals[param]
total += (self.i - self._tstamps[param]) * weight
averaged = round(total / float(self.i), 3)
if averaged:
new_feat_weights[clas] = averaged
self.weights[feat] = new_feat_weights
class POSTagger:
"""A part-of-speech tagger.
This is a modified version based on the textblob-aptagger codebase
(MIT license), with original implementation by Matthew Honnibal:
https://github.com/sloria/textblob-aptagger/blob/266fa1c22daaff7c60577efa8577f1b6ce2f7f70/textblob_aptagger/taggers.py
"""
START = ["-START-", "-START2-"]
END = ["-END-", "-END2-"]
def __init__(self, *, frequency_threshold=10, ambiguity_threshold=0.95, n_iter=5):
"""Initialize a part-of-speech tagger.
Parameters
----------
frequency_threshold : int, optional
A good number of words are almost unambiguously associated with
a given tag. If these words have a frequency of occurrence above
this threshold in the training data, they are directly associated
with their tag in the model.
ambiguity_threshold : float, optional
A good number of words are almost unambiguously associated with
a given tag. If the ratio of (# of occurrences of this word with
this tag) / (# of occurrences of this word) in the training data
is equal to or greater than this threshold, then this word is
directly associated with the tag in the model.
n_iter : int, optional
Number of times the training phase iterates through the data.
At each new iteration, the data is randomly shuffled.
"""
self.frequency_threshold = frequency_threshold
self.ambiguity_threshold = ambiguity_threshold
self.n_iter = n_iter
self.model = _AveragedPerceptron()
self.tagdict = {}
self.classes = set()
# HKCanCor doesn't have the Chinese full-width punctuation marks.
self.tagdict.update({punct: punct for punct in _PUNCTUATION_MARKS})
def tag(self, words):
"""Tag the words.
Parameters
----------
words : list[str]
A segmented sentence or phrase, where each word is Cantonese
characters.
Returns
-------
list[str]
The list of predicted tags.
"""
prev, prev2 = self.START
tags = []
context = self.START + words + self.END
for i, word in enumerate(words):
tag = self.tagdict.get(word)
if not tag:
features = self._get_features(i, word, context, prev, prev2)
tag = self.model.predict(features)
tags.append(tag)
prev2 = prev
prev = tag
return tags
def train(self, tagged_sents, save=None):
"""Train a model.
Parameters
----------
tagged_sents : list[list[tuple[str, str]]]
A list of segmented and tagged sentences for training.
save : str, optional
If given, save the trained model as a pickle at this path.
"""
self._make_tagdict(tagged_sents)
self.model.classes = self.classes
prev, prev2 = self.START
for iter_ in range(self.n_iter):
c = 0
n = 0
for tagged_sent in tagged_sents:
context = self.START + [w for w, _ in tagged_sent] + self.END
for i, (word, tag) in enumerate(tagged_sent):
try:
guess = self.tagdict[word]
except KeyError:
feats = self._get_features(i, word, context, prev, prev2)
guess = self.model.predict(feats)
self.model.update(tag, guess, feats)
prev2 = prev
prev = guess
c += guess == tag
n += 1
random.shuffle(tagged_sents)
logging.info("Iter %d: %d / %d = %f", iter_, c, n, c / n)
self.model.average_weights()
if save is not None:
pickle.dump(
(self.model.weights, self.tagdict, self.classes),
open(save, "wb"),
protocol=_PICKLE_PROTOCOL,
)
def load(self, path):
"""Load a pickled model.
Parameters
----------
path : str
The path where the pickled model is located.
"""
try:
w_td_c = pickle.load(open(path, "rb")) # nosec
except IOError:
raise FileNotFoundError(f"Can't locate tagger model {path}")
except: # noqa
raise EnvironmentError(
f"A file is detected at {path}, but it cannot be read as a "
"a tagger model. "
"Either the tagger model file object is corrupted for some reason, "
"or - perhaps more likely - you're running pycantonese from a local "
"git repo (e.g., when you are doing dev work) and that you do not have "
"Git LFS installed on your system. "
"In the latter case, please install Git LFS "
"(https://git-lfs.github.com/) and re-install pycantonese."
)
self.model.weights, self.tagdict, self.classes = w_td_c
self.model.classes = self.classes
def _get_features(self, i, word, context, prev, prev2):
"""Map tokens into a feature representation, implemented as a
{hashable: float} dict. If the features change, a new model must be
trained.
"""
def add(name, *args):
features[" ".join((name,) + tuple(args))] += 1
i += len(self.START)
features = collections.defaultdict(int)
# It's useful to have a constant feature,
# which acts sort of like a prior.
add("bias")
add("i word's first char", word[0])
add("i word's final char", word[-1])
add("i-1 word's first char", context[i - 1][0])
add("i-1 word's final char", context[i - 1][-1])
add("i-1 tag", prev)
add("i-2 word's first char", context[i - 2][0])
add("i-2 word's final char", context[i - 2][-1])
add("i-2 tag", prev2)
add("i+1 word's first char", context[i + 1][0])
add("i+1 word's final char", context[i + 1][-1])
add("i+2 word's first char", context[i - 2][0])
add("i+2 word's final char", context[i - 2][-1])
return features
def _make_tagdict(self, tagged_sents):
"""Make a tag dictionary for single-tag words."""
counts = collections.defaultdict(lambda: collections.defaultdict(int))
for tagged_sent in tagged_sents:
for word, tag in tagged_sent:
counts[word][tag] += 1
self.classes.add(tag)
words = set()
for word, tag_freqs in counts.items():
words.add(word)
tag, mode = max(tag_freqs.items(), key=lambda item: item[1])
n = sum(tag_freqs.values())
above_freq_threshold = n >= self.frequency_threshold
unambiguous = (mode / n) >= self.ambiguity_threshold
if above_freq_threshold and unambiguous:
self.tagdict[word] = tag
logging.info("%d unique words in the training data", len(words))
logging.info("%d tags in this tagset", len(self.classes))
logging.info("%d words are treated as having a unique tag", len(self.tagdict))
@functools.lru_cache(maxsize=1)
def _get_tagger():
tagger = POSTagger()
tagger.load(_PICKLE_PATH)
return tagger
[docs]def pos_tag(words, tagset="universal"):
"""Tag the words for their parts of speech.
The part-of-speech tagger uses an averaged perceptron model,
and is trained by the HKCanCor data.
.. versionadded:: 3.1.0
Parameters
----------
words : list[str]
A segmented sentence or phrase, where each word is a string of
Cantonese characters.
tagset : str, {"universal", "hkcancor"}
The part-of-speech tagset that the returned tags are in.
Supported options:
* ``"hkcancor"``, for the tagset used by the original HKCanCor data.
There are over 100 tags, 46 of which are described at
http://compling.hss.ntu.edu.sg/hkcancor/.
* ``"universal"`` (default option), for the Universal Dependencies v2
tagset. There are 17 tags; see
https://universaldependencies.org/u/pos/index.html.
Internally, this option applies
:func:`~pycantonese.pos_tagging.hkcancor_to_ud` to convert HKCanCor
tags to UD tags.
Returns
-------
list[tuple[str, str]]
The segmented sentence/phrase where each word is paired with its
predicted POS tag.
Raises
------
TypeError
If the input is a string (e.g., an unsegmented string of Cantonese).
ValueError
If the ``tagset`` argument is not one of the allowed options from
``{"universal", "hkcancor"}``.
Examples
--------
>>> words = ['我', '噚日', '買', '嗰', '對', '鞋', '。'] # I bought that pair of shoes yesterday.
>>> pos_tag(words)
[('我', 'PRON'), ('噚日', 'ADV'), ('買', 'VERB'), ('嗰', 'PRON'), ('對', 'NOUN'), ('鞋', 'NOUN'), ('。', 'PUNCT')]
>>> pos_tag(words, tagset="hkcancor")
[('我', 'R'), ('噚日', 'T'), ('買', 'V'), ('嗰', 'R'), ('對', 'Q'), ('鞋', 'N'), ('。', '。')]
""" # noqa: E501
if type(words) == str:
raise TypeError(
f"Input must be a list of segmented words, not a string: {words}"
)
tags = _get_tagger().tag(words)
if tagset == "universal":
tags = [hkcancor_to_ud(tag) for tag in tags]
elif tagset != "hkcancor":
raise ValueError(f"tagset must be one of {{'universal', 'hkcancor'}}: {tagset}")
return list(zip(words, tags))