netbox/netbox/core/views.py

548 lines
17 KiB
Python

from django.apps import apps
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.mixins import UserPassesTestMixin
from django.core.cache import cache
from django.http import HttpResponseForbidden, Http404
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from django.views.generic import View
from django_rq.queues import get_queue_by_index, get_redis_connection
from django_rq.settings import QUEUES_MAP, QUEUES_LIST
from django_rq.utils import get_jobs, get_statistics, stop_jobs
from rq import requeue_job
from rq.exceptions import NoSuchJobError
from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
from rq.registry import (
DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, ScheduledJobRegistry, StartedJobRegistry,
)
from rq.worker import Worker
from rq.worker_registration import clean_worker_registry
from netbox.config import get_config, PARAMS
from netbox.views import generic
from netbox.views.generic.base import BaseObjectView
from netbox.views.generic.mixins import TableMixin
from utilities.forms import ConfirmationForm
from utilities.htmx import htmx_partial
from utilities.query import count_related
from utilities.views import ContentTypePermissionRequiredMixin, register_model_view
from . import filtersets, forms, tables
from .models import *
#
# Data sources
#
class DataSourceListView(generic.ObjectListView):
queryset = DataSource.objects.annotate(
file_count=count_related(DataFile, 'source')
)
filterset = filtersets.DataSourceFilterSet
filterset_form = forms.DataSourceFilterForm
table = tables.DataSourceTable
@register_model_view(DataSource)
class DataSourceView(generic.ObjectView):
queryset = DataSource.objects.all()
def get_extra_context(self, request, instance):
related_models = (
(DataFile.objects.restrict(request.user, 'view').filter(source=instance), 'source_id'),
)
return {
'related_models': related_models,
}
@register_model_view(DataSource, 'sync')
class DataSourceSyncView(BaseObjectView):
queryset = DataSource.objects.all()
def get_required_permission(self):
return 'core.sync_datasource'
def get(self, request, pk):
# Redirect GET requests to the object view
datasource = get_object_or_404(self.queryset, pk=pk)
return redirect(datasource.get_absolute_url())
def post(self, request, pk):
datasource = get_object_or_404(self.queryset, pk=pk)
job = datasource.enqueue_sync_job(request)
messages.success(request, f"Queued job #{job.pk} to sync {datasource}")
return redirect(datasource.get_absolute_url())
@register_model_view(DataSource, 'edit')
class DataSourceEditView(generic.ObjectEditView):
queryset = DataSource.objects.all()
form = forms.DataSourceForm
@register_model_view(DataSource, 'delete')
class DataSourceDeleteView(generic.ObjectDeleteView):
queryset = DataSource.objects.all()
class DataSourceBulkImportView(generic.BulkImportView):
queryset = DataSource.objects.all()
model_form = forms.DataSourceImportForm
class DataSourceBulkEditView(generic.BulkEditView):
queryset = DataSource.objects.annotate(
count_files=count_related(DataFile, 'source')
)
filterset = filtersets.DataSourceFilterSet
table = tables.DataSourceTable
form = forms.DataSourceBulkEditForm
class DataSourceBulkDeleteView(generic.BulkDeleteView):
queryset = DataSource.objects.annotate(
count_files=count_related(DataFile, 'source')
)
filterset = filtersets.DataSourceFilterSet
table = tables.DataSourceTable
#
# Data files
#
class DataFileListView(generic.ObjectListView):
queryset = DataFile.objects.defer('data')
filterset = filtersets.DataFileFilterSet
filterset_form = forms.DataFileFilterForm
table = tables.DataFileTable
actions = {
'bulk_delete': {'delete'},
}
@register_model_view(DataFile)
class DataFileView(generic.ObjectView):
queryset = DataFile.objects.all()
@register_model_view(DataFile, 'delete')
class DataFileDeleteView(generic.ObjectDeleteView):
queryset = DataFile.objects.all()
class DataFileBulkDeleteView(generic.BulkDeleteView):
queryset = DataFile.objects.defer('data')
filterset = filtersets.DataFileFilterSet
table = tables.DataFileTable
#
# Jobs
#
class JobListView(generic.ObjectListView):
queryset = Job.objects.all()
filterset = filtersets.JobFilterSet
filterset_form = forms.JobFilterForm
table = tables.JobTable
actions = {
'export': {'view'},
'bulk_delete': {'delete'},
}
class JobView(generic.ObjectView):
queryset = Job.objects.all()
class JobDeleteView(generic.ObjectDeleteView):
queryset = Job.objects.all()
class JobBulkDeleteView(generic.BulkDeleteView):
queryset = Job.objects.all()
filterset = filtersets.JobFilterSet
table = tables.JobTable
#
# Config Revisions
#
class ConfigView(generic.ObjectView):
queryset = ConfigRevision.objects.all()
def get_object(self, **kwargs):
revision_id = cache.get('config_version')
try:
return ConfigRevision.objects.get(pk=revision_id)
except ConfigRevision.DoesNotExist:
# Fall back to using the active config data if no record is found
return ConfigRevision(
data=get_config().defaults
)
class ConfigRevisionListView(generic.ObjectListView):
queryset = ConfigRevision.objects.all()
filterset = filtersets.ConfigRevisionFilterSet
filterset_form = forms.ConfigRevisionFilterForm
table = tables.ConfigRevisionTable
@register_model_view(ConfigRevision)
class ConfigRevisionView(generic.ObjectView):
queryset = ConfigRevision.objects.all()
class ConfigRevisionEditView(generic.ObjectEditView):
queryset = ConfigRevision.objects.all()
form = forms.ConfigRevisionForm
@register_model_view(ConfigRevision, 'delete')
class ConfigRevisionDeleteView(generic.ObjectDeleteView):
queryset = ConfigRevision.objects.all()
class ConfigRevisionBulkDeleteView(generic.BulkDeleteView):
queryset = ConfigRevision.objects.all()
filterset = filtersets.ConfigRevisionFilterSet
table = tables.ConfigRevisionTable
class ConfigRevisionRestoreView(ContentTypePermissionRequiredMixin, View):
def get_required_permission(self):
return 'core.configrevision_edit'
def get(self, request, pk):
candidate_config = get_object_or_404(ConfigRevision, pk=pk)
# Get the current ConfigRevision
config_version = get_config().version
current_config = ConfigRevision.objects.filter(pk=config_version).first()
params = []
for param in PARAMS:
params.append((
param.name,
current_config.data.get(param.name, None),
candidate_config.data.get(param.name, None)
))
return render(request, 'core/configrevision_restore.html', {
'object': candidate_config,
'params': params,
})
def post(self, request, pk):
if not request.user.has_perm('core.configrevision_edit'):
return HttpResponseForbidden()
candidate_config = get_object_or_404(ConfigRevision, pk=pk)
candidate_config.activate()
messages.success(request, f"Restored configuration revision #{pk}")
return redirect(candidate_config.get_absolute_url())
#
# Background Tasks (RQ)
#
class BaseRQView(UserPassesTestMixin, View):
def test_func(self):
return self.request.user.is_staff
class BackgroundQueueListView(TableMixin, BaseRQView):
table = tables.BackgroundQueueTable
def get(self, request):
data = get_statistics(run_maintenance_tasks=True)["queues"]
table = self.get_table(data, request, bulk_actions=False)
return render(request, 'core/rq_queue_list.html', {
'table': table,
})
class BackgroundTaskListView(TableMixin, BaseRQView):
table = tables.BackgroundTaskTable
def get_table_data(self, request, queue, status):
jobs = []
# Call get_jobs() to returned queued tasks
if status == RQJobStatus.QUEUED:
return queue.get_jobs()
# For other statuses, determine the registry to list (or raise a 404 for invalid statuses)
try:
registry_cls = {
RQJobStatus.STARTED: StartedJobRegistry,
RQJobStatus.DEFERRED: DeferredJobRegistry,
RQJobStatus.FINISHED: FinishedJobRegistry,
RQJobStatus.FAILED: FailedJobRegistry,
RQJobStatus.SCHEDULED: ScheduledJobRegistry,
}[status]
except KeyError:
raise Http404
registry = registry_cls(queue.name, queue.connection)
job_ids = registry.get_job_ids()
if status != RQJobStatus.DEFERRED:
jobs = get_jobs(queue, job_ids, registry)
else:
# Deferred jobs require special handling
for job_id in job_ids:
try:
jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer))
except NoSuchJobError:
pass
if jobs and status == RQJobStatus.SCHEDULED:
for job in jobs:
job.scheduled_at = registry.get_scheduled_time(job)
return jobs
def get(self, request, queue_index, status):
queue = get_queue_by_index(queue_index)
data = self.get_table_data(request, queue, status)
table = self.get_table(data, request, False)
# If this is an HTMX request, return only the rendered table HTML
if htmx_partial(request):
return render(request, 'htmx/table.html', {
'table': table,
})
return render(request, 'core/rq_task_list.html', {
'table': table,
'queue': queue,
'status': status,
})
class BackgroundTaskView(BaseRQView):
def get(self, request, job_id):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
try:
exc_info = job._exc_info
except AttributeError:
exc_info = None
return render(request, 'core/rq_task.html', {
'queue': queue,
'job': job,
'queue_index': queue_index,
'dependency_id': job._dependency_id,
'exc_info': exc_info,
})
class BackgroundTaskDeleteView(BaseRQView):
def get(self, request, job_id):
if not request.htmx:
return redirect(reverse('core:background_queue_list'))
form = ConfirmationForm(initial=request.GET)
return render(request, 'htmx/delete_form.html', {
'object_type': 'background task',
'object': job_id,
'form': form,
'form_url': reverse('core:background_task_delete', kwargs={'job_id': job_id})
})
def post(self, request, job_id):
form = ConfirmationForm(request.POST)
if form.is_valid():
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
# Remove job id from queue and delete the actual job
queue.connection.lrem(queue.key, 0, job.id)
job.delete()
messages.success(request, f'Deleted job {job_id}')
else:
messages.error(request, f'Error deleting job: {form.errors[0]}')
return redirect(reverse('core:background_queue_list'))
class BackgroundTaskRequeueView(BaseRQView):
def get(self, request, job_id):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
messages.success(request, f'You have successfully requeued: {job_id}')
return redirect(reverse('core:background_task', args=[job_id]))
class BackgroundTaskEnqueueView(BaseRQView):
def get(self, request, job_id):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
try:
# _enqueue_job is new in RQ 1.14, this is used to enqueue
# job regardless of its dependencies
queue._enqueue_job(job)
except AttributeError:
queue.enqueue_job(job)
# Remove job from correct registry if needed
if job.get_status() == RQJobStatus.DEFERRED:
registry = DeferredJobRegistry(queue.name, queue.connection)
registry.remove(job)
elif job.get_status() == RQJobStatus.FINISHED:
registry = FinishedJobRegistry(queue.name, queue.connection)
registry.remove(job)
elif job.get_status() == RQJobStatus.SCHEDULED:
registry = ScheduledJobRegistry(queue.name, queue.connection)
registry.remove(job)
messages.success(request, f'You have successfully enqueued: {job_id}')
return redirect(reverse('core:background_task', args=[job_id]))
class BackgroundTaskStopView(BaseRQView):
def get(self, request, job_id):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
stopped, _ = stop_jobs(queue, job_id)
if len(stopped) == 1:
messages.success(request, f'You have successfully stopped {job_id}')
else:
messages.error(request, f'Failed to stop {job_id}')
return redirect(reverse('core:background_task', args=[job_id]))
class WorkerListView(TableMixin, BaseRQView):
table = tables.WorkerTable
def get_table_data(self, request, queue):
clean_worker_registry(queue)
all_workers = Worker.all(queue.connection)
workers = [worker for worker in all_workers if queue.name in worker.queue_names()]
return workers
def get(self, request, queue_index):
queue = get_queue_by_index(queue_index)
data = self.get_table_data(request, queue)
table = self.get_table(data, request, False)
# If this is an HTMX request, return only the rendered table HTML
if htmx_partial(request):
if not request.htmx.target:
table.embedded = True
# Hide selection checkboxes
if 'pk' in table.base_columns:
table.columns.hide('pk')
return render(request, 'htmx/table.html', {
'table': table,
'queue': queue,
})
return render(request, 'core/rq_worker_list.html', {
'table': table,
'queue': queue,
})
class WorkerView(BaseRQView):
def get(self, request, key):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
worker = Worker.find_by_key('rq:worker:' + key, connection=get_redis_connection(config['connection_config']))
# Convert microseconds to milliseconds
worker.total_working_time = worker.total_working_time / 1000
return render(request, 'core/rq_worker.html', {
'worker': worker,
'job': worker.get_current_job(),
'total_working_time': worker.total_working_time * 1000,
})
#
# Plugins
#
class PluginListView(UserPassesTestMixin, View):
def test_func(self):
return self.request.user.is_staff
def get(self, request):
plugins = [
# Look up app config by package name
apps.get_app_config(plugin.rsplit('.', 1)[-1]) for plugin in settings.PLUGINS
]
table = tables.PluginTable(plugins, user=request.user)
table.configure(request)
return render(request, 'core/plugin_list.html', {
'plugins': plugins,
'active_tab': 'api-tokens',
'table': table,
})