import logging |
import logging |
|
|
import ckan.plugins as plugins |
import ckan.plugins as plugins |
import ckan.lib as lib |
import ckan.lib as lib |
import ckan.lib.dictization.model_dictize as model_dictize |
import ckan.lib.dictization.model_dictize as model_dictize |
import ckan.plugins.toolkit as tk |
import ckan.plugins.toolkit as tk |
import ckan.model as model |
import ckan.model as model |
from pylons import config |
from pylons import config |
|
|
#parse the activity feed for last active non-system user |
#parse the activity feed for last active non-system user |
def get_last_active_user(id): |
def get_last_active_user(id): |
system_user = lib.helpers.get_action('user_show',{'id': config.get('ckan.site_id', 'ckan_site_user')}) |
system_user = lib.helpers.get_action('user_show',{'id': config.get('ckan.site_id', 'ckan_site_user')}) |
user_list = [x for x in lib.helpers.get_action('package_activity_list',{'id':id}) if x['user_id'] != system_user['id']] |
user_list = [x for x in lib.helpers.get_action('package_activity_list',{'id':id}) if x['user_id'] != system_user['id']] |
user = None |
user = None |
if len(user_list) > 0: |
if len(user_list) > 0: |
user = user_list[0].get('user_id', None) |
user = user_list[0].get('user_id', None) |
if user is None: |
if user is None: |
return system_user |
return system_user |
else: |
else: |
return lib.helpers.get_action('user_show',{'id':user}) |
return lib.helpers.get_action('user_show',{'id':user}) |
|
|
# get user created datasets and those they have edited |
# get user created datasets and those they have edited |
def get_user_datasets(user_dict): |
def get_user_datasets(user_dict): |
created_datasets_list = user_dict['datasets'] |
created_datasets_list = user_dict['datasets'] |
active_datasets_list = [x['data']['package'] for x in |
active_datasets_list = [x['data']['package'] for x in |
lib.helpers.get_action('user_activity_list',{'id':user_dict['id']}) if x['data'].get('package')] |
lib.helpers.get_action('user_activity_list',{'id':user_dict['id']}) if x['data'].get('package')] |
return created_datasets_list + active_datasets_list |
return created_datasets_list + active_datasets_list |
|
|
|
|
class ExampleIDatasetFormPlugin(plugins.SingletonPlugin, |
class DataGovAuPlugin(plugins.SingletonPlugin, |
tk.DefaultDatasetForm): |
tk.DefaultDatasetForm): |
'''An example IDatasetForm CKAN plugin. |
'''An example IDatasetForm CKAN plugin. |
|
|
Uses a tag vocabulary to add a custom metadata field to datasets. |
Uses a tag vocabulary to add a custom metadata field to datasets. |
|
|
''' |
''' |
plugins.implements(plugins.IConfigurer, inherit=False) |
plugins.implements(plugins.IConfigurer, inherit=False) |
plugins.implements(plugins.IDatasetForm, inherit=False) |
plugins.implements(plugins.IDatasetForm, inherit=False) |
plugins.implements(plugins.ITemplateHelpers, inherit=False) |
plugins.implements(plugins.ITemplateHelpers, inherit=False) |
|
plugins.implements(plugins.IAuthFunctions) |
|
|
# These record how many times methods that this plugin's methods are |
def datastore_search(context, data_dict): |
# called, for testing purposes. |
return {'success': True} # allow all |
num_times_new_template_called = 0 |
|
num_times_read_template_called = 0 |
def get_auth_functions(self): |
num_times_edit_template_called = 0 |
return {'datastore_search': datastore_search} |
num_times_search_template_called = 0 |
|
num_times_history_template_called = 0 |
|
num_times_package_form_called = 0 |
|
num_times_check_data_dict_called = 0 |
|
num_times_setup_template_variables_called = 0 |
|
|
|
|
|
def update_config(self, config): |
def update_config(self, config): |
# Add this plugin's templates dir to CKAN's extra_template_paths, so |
# Add this plugin's templates dir to CKAN's extra_template_paths, so |
# that CKAN will use this plugin's custom templates. |
# that CKAN will use this plugin's custom templates. |
# here = os.path.dirname(__file__) |
# here = os.path.dirname(__file__) |
# rootdir = os.path.dirname(os.path.dirname(here)) |
# rootdir = os.path.dirname(os.path.dirname(here)) |
|
|
tk.add_template_directory(config, 'templates') |
tk.add_template_directory(config, 'templates') |
tk.add_public_directory(config, 'theme/public') |
tk.add_public_directory(config, 'theme/public') |
tk.add_resource('theme/public', 'ckanext-datagovau') |
tk.add_resource('theme/public', 'ckanext-datagovau') |
# config['licenses_group_url'] = 'http://%(ckan.site_url)/licenses.json' |
# config['licenses_group_url'] = 'http://%(ckan.site_url)/licenses.json' |
|
|
def get_helpers(self): |
def get_helpers(self): |
return {'get_last_active_user': get_last_active_user, 'get_user_datasets': get_user_datasets} |
return {'get_last_active_user': get_last_active_user, 'get_user_datasets': get_user_datasets} |
|
|
def is_fallback(self): |
def is_fallback(self): |
# Return True to register this plugin as the default handler for |
# Return True to register this plugin as the default handler for |
# package types not handled by any other IDatasetForm plugin. |
# package types not handled by any other IDatasetForm plugin. |
return True |
return True |
|
|
def package_types(self): |
def package_types(self): |
# This plugin doesn't handle any special package types, it just |
# This plugin doesn't handle any special package types, it just |
# registers itself as the default (above). |
# registers itself as the default (above). |
return [] |
return [] |
|
|
|
|
def create_package_schema(self): |
def create_package_schema(self): |
schema = super(ExampleIDatasetFormPlugin, self).create_package_schema() |
schema = super(ExampleIDatasetFormPlugin, self).create_package_schema() |
schema = self._modify_package_schema(schema) |
schema = self._modify_package_schema(schema) |
return schema |
return schema |
|
|
def update_package_schema(self): |
def update_package_schema(self): |
schema = super(ExampleIDatasetFormPlugin, self).update_package_schema() |
schema = super(ExampleIDatasetFormPlugin, self).update_package_schema() |
schema = self._modify_package_schema(schema) |
schema = self._modify_package_schema(schema) |
return schema |
return schema |
|
|
def show_package_schema(self): |
def show_package_schema(self): |
schema = super(ExampleIDatasetFormPlugin, self).show_package_schema() |
schema = super(ExampleIDatasetFormPlugin, self).show_package_schema() |
|
|
# Don't show vocab tags mixed in with normal 'free' tags |
# Don't show vocab tags mixed in with normal 'free' tags |
# (e.g. on dataset pages, or on the search page) |
# (e.g. on dataset pages, or on the search page) |
schema['tags']['__extras'].append(tk.get_converter('free_tags_only')) |
schema['tags']['__extras'].append(tk.get_converter('free_tags_only')) |
|
|
# Add our custom_text field to the dataset schema. |
# Add our custom_text field to the dataset schema. |
# ignore_missing == optional |
# ignore_missing == optional |
# ignore_empty == mandatory but not for viewing |
# ignore_empty == mandatory but not for viewing |
# !!! always convert_from_extras first |
# !!! always convert_from_extras first |
schema.update({ |
schema.update({ |
'agency_program': [tk.get_converter('convert_from_extras'), |
'agency_program': [tk.get_converter('convert_from_extras'), |
tk.get_validator('ignore_missing')], |
tk.get_validator('ignore_missing')], |
'contact_point': [tk.get_converter('convert_from_extras'), |
'contact_point': [tk.get_converter('convert_from_extras'), |
tk.get_validator('ignore_empty')], |
tk.get_validator('ignore_empty')], |
'spatial_coverage': [tk.get_converter('convert_from_extras'), |
'spatial_coverage': [tk.get_converter('convert_from_extras'), |
tk.get_validator('ignore_empty')], |
tk.get_validator('ignore_empty')], |
'granularity': [tk.get_converter('convert_from_extras'), |
'granularity': [tk.get_converter('convert_from_extras'), |
tk.get_validator('ignore_empty')], |
tk.get_validator('ignore_empty')], |
'jurisdiction': [tk.get_converter('convert_from_extras'), |
'jurisdiction': [tk.get_converter('convert_from_extras'), |
tk.get_validator('ignore_empty')], |
tk.get_validator('ignore_empty')], |
'temporal_coverage': [tk.get_converter('convert_from_extras'), |
'temporal_coverage': [tk.get_converter('convert_from_extras'), |
tk.get_validator('ignore_empty')], |
tk.get_validator('ignore_empty')], |
'data_state': [tk.get_converter('convert_from_extras'), |
'data_state': [tk.get_converter('convert_from_extras'), |
tk.get_validator('ignore_empty')], |
tk.get_validator('ignore_empty')], |
'update_freq': [tk.get_converter('convert_from_extras'), |
'update_freq': [tk.get_converter('convert_from_extras'), |
tk.get_validator('ignore_empty')] |
tk.get_validator('ignore_empty')] |
}) |
}) |
return schema |
return schema |
|
|
def _modify_package_schema(self, schema): |
def _modify_package_schema(self, schema): |
# Add our custom_test metadata field to the schema, this one will use |
# Add our custom_test metadata field to the schema, this one will use |
# convert_to_extras instead of convert_to_tags. |
# convert_to_extras instead of convert_to_tags. |
# ignore_missing == optional |
# ignore_missing == optional |
# not_empty == mandatory, enforced here while modifying |
# not_empty == mandatory, enforced here while modifying |
|
|
schema.update({ |
schema.update({ |
'agency_program': [tk.get_validator('ignore_missing'), |
'agency_program': [tk.get_validator('ignore_missing'), |
tk.get_converter('convert_to_extras')], |
tk.get_converter('convert_to_extras')], |
'contact_point': [tk.get_converter('convert_to_extras'), |
'contact_point': [tk.get_converter('convert_to_extras'), |
tk.get_validator('not_empty')], |
tk.get_validator('not_empty')], |
'spatial_coverage': [tk.get_converter('convert_to_extras'), |
'spatial_coverage': [tk.get_converter('convert_to_extras'), |
tk.get_validator('not_empty')], |
tk.get_validator('not_empty')], |
'granularity': [tk.get_converter('convert_to_extras'), |
'granularity': [tk.get_converter('convert_to_extras'), |
tk.get_validator('not_empty')], |
tk.get_validator('not_empty')], |
'jurisdiction': [tk.get_converter('convert_to_extras'), |
'jurisdiction': [tk.get_converter('convert_to_extras'), |
tk.get_validator('not_empty')], |
tk.get_validator('not_empty')], |
'temporal_coverage': [tk.get_converter('convert_to_extras'), |
'temporal_coverage': [tk.get_converter('convert_to_extras'), |
tk.get_validator('not_empty')], |
tk.get_validator('not_empty')], |
'data_state': [tk.get_converter('convert_to_extras'), |
'data_state': [tk.get_converter('convert_to_extras'), |
tk.get_validator('not_empty')], |
tk.get_validator('not_empty')], |
'update_freq': [tk.get_converter('convert_to_extras'), |
'update_freq': [tk.get_converter('convert_to_extras'), |
tk.get_validator('not_empty')] |
tk.get_validator('not_empty')] |
}) |
}) |
return schema |
return schema |
|
|
# These methods just record how many times they're called, for testing |
# These methods just record how many times they're called, for testing |
# purposes. |
# purposes. |
# TODO: It might be better to test that custom templates returned by |
# TODO: It might be better to test that custom templates returned by |
# these methods are actually used, not just that the methods get |
# these methods are actually used, not just that the methods get |
# called. |
# called. |
|
|
def setup_template_variables(self, context, data_dict): |
def setup_template_variables(self, context, data_dict): |
return super(ExampleIDatasetFormPlugin, self).setup_template_variables( |
return super(ExampleIDatasetFormPlugin, self).setup_template_variables( |
context, data_dict) |
context, data_dict) |
|
|
def new_template(self): |
def new_template(self): |
return super(ExampleIDatasetFormPlugin, self).new_template() |
return super(ExampleIDatasetFormPlugin, self).new_template() |
|
|
def read_template(self): |
def read_template(self): |
return super(ExampleIDatasetFormPlugin, self).read_template() |
return super(ExampleIDatasetFormPlugin, self).read_template() |
|
|
def edit_template(self): |
def edit_template(self): |
return super(ExampleIDatasetFormPlugin, self).edit_template() |
return super(ExampleIDatasetFormPlugin, self).edit_template() |
|
|
def search_template(self): |
def search_template(self): |
return super(ExampleIDatasetFormPlugin, self).search_template() |
return super(ExampleIDatasetFormPlugin, self).search_template() |
|
|
def history_template(self): |
def history_template(self): |
return super(ExampleIDatasetFormPlugin, self).history_template() |
return super(ExampleIDatasetFormPlugin, self).history_template() |
|
|
def package_form(self): |
def package_form(self): |
return super(ExampleIDatasetFormPlugin, self).package_form() |
return super(ExampleIDatasetFormPlugin, self).package_form() |
|
|
|
|