hell-student
Lieutenant
- Registriert
- Nov. 2007
- Beiträge
- 671
Hallo Zusammen,
Ich hänge momentan an einem Problem und komme einfach nicht weiter. Ich habe einen Threadpool von 100 Threads und möchte Jobs zuweisen bzw. wenn ein Job in die Job Queue kommt, werden die wartenden Threads "geweckt" und führen den Job aus. Mein Problem ist, dass ich irgendwie das Füllen bzw. entnehmen eines Jobs nicht richtig hinbekommen habe. Habe mich hier an diesen Link gehalten. Wenn ich Ein Job Element hinzufüge, sollte ein Thread den Job bearbeiten. Da ich einfach nur in jedem Job herunterzähle und dies auch ausgebe, sollte das eigentich passen. Das Problem ist aber das ich viele Jobs hinzufügen kann, aber nur einer bearbeitet wird. Irgendwie hängst einfach beim einfügen und entfernen.
Ich möchte eigentlich eine Linked List haben, bei der der erst eingefügte Job als erstes bearbeitet wird, also einfach immer am Head die Jobs abgreifen. Sollte also eigentlich der einfachste fall sein, eine Liste zu füllen und zu entleeren.
Hier ist der Code:
Per FIFO wird eine Nachricht gesendet, und falls diese "start" lautet, soll ein neuer Job erstellt werden und dann auch abgebarbeitet werden. Sende ich aber nacheinander 2 mal "start" printet mir nur ein Thread was aus, also gehe ich davon aus, dass ich die Jobs nicht richtig einfüge.
Ich hänge momentan an einem Problem und komme einfach nicht weiter. Ich habe einen Threadpool von 100 Threads und möchte Jobs zuweisen bzw. wenn ein Job in die Job Queue kommt, werden die wartenden Threads "geweckt" und führen den Job aus. Mein Problem ist, dass ich irgendwie das Füllen bzw. entnehmen eines Jobs nicht richtig hinbekommen habe. Habe mich hier an diesen Link gehalten. Wenn ich Ein Job Element hinzufüge, sollte ein Thread den Job bearbeiten. Da ich einfach nur in jedem Job herunterzähle und dies auch ausgebe, sollte das eigentich passen. Das Problem ist aber das ich viele Jobs hinzufügen kann, aber nur einer bearbeitet wird. Irgendwie hängst einfach beim einfügen und entfernen.
Ich möchte eigentlich eine Linked List haben, bei der der erst eingefügte Job als erstes bearbeitet wird, also einfach immer am Head die Jobs abgreifen. Sollte also eigentlich der einfachste fall sein, eine Liste zu füllen und zu entleeren.
Hier ist der Code:
Code:
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include "pthread.h"
#include "runtime_system.h"
#define THREADS_PER_SET 100
typedef enum {
UNUSED = 0,
PENDING,
WORKING,
SUCCESS,
FAILURE,
CANCELED
} status_t;
struct invade_job_struct {
struct invade_job_struct *next;
volatile int canceled;
pthread_mutex_t mutex;
status_t status;
/* Job parameters. This one iterates n times sleep(1)
*/
unsigned long n;
};
static pthread_mutex_t work_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t work_cond = PTHREAD_COND_INITIALIZER;
static struct invade_job_struct *invade_queue_head = NULL;
static struct invade_job_struct *invade_queue_ptr = NULL;
static struct invade_job_struct *finished = NULL;
/*
* job worker function
*/
#define DEBUG 1
status_t invade(struct invade_job_struct *const job)
{
volatile int *const canceld = &job->canceled;
unsigned long n = job->n;
while (n > 0UL) {
if (*canceld) {
return CANCELED;
}
n--;
if (DEBUG) {
printf("Thread %u : invade = %lu\n", (unsigned int)pthread_self(), n);
fflush(stdout);
}
}
return SUCCESS;
}
/* Invade thread function
*
*/
void *invade_thread_worker(void *payload __attribute__((unused)))
{
struct invade_job_struct *job;
status_t status;
pthread_mutex_lock(&work_mutex);
while (1) {
/* no job in queue */
if (!invade_queue_head) {
pthread_cond_wait(&work_cond, &work_mutex);
continue;
}
/* grep first job element of invade queue */
job = invade_queue_head;
if (invade_queue_head->next != NULL) {
invade_queue_ptr = invade_queue_head->next;
invade_queue_head = invade_queue_ptr;
}
else {
invade_queue_ptr = NULL;
invade_queue_head = NULL;
}
pthread_mutex_lock(&job->mutex);
if (job->status == PENDING) {
/* work on it */
job->status = WORKING;
pthread_mutex_unlock(&job->mutex);
status = invade(job);
pthread_mutex_lock(&job->mutex);
if (job->status == WORKING) {
job->status = status;
}
}
pthread_mutex_unlock(&job->mutex);
}
}
void cancel_invade_job(struct invade_job_struct *const job)
{
pthread_mutex_lock(&job->mutex);
if (job->status == PENDING || job->status == WORKING) {
job->canceled = ~0;
job->status = CANCELED;
}
pthread_mutex_unlock(&job->mutex);
}
void create_invade_job()
{
struct invade_job_struct *job;
/* empty invade job queue */
if(invade_queue_head == NULL) {
invade_queue_head = (struct invade_job_struct *) calloc(1, sizeof(struct invade_job_struct));
invade_queue_head->status = PENDING;
invade_queue_head->n = 1000;
invade_queue_head->next = NULL;
invade_queue_ptr = invade_queue_head;
printf("created first element\n");
}
else {
invade_queue_ptr = invade_queue_head;
while (invade_queue_ptr->next != NULL) {
invade_queue_ptr = invade_queue_ptr->next;
}
invade_queue_ptr->next = (struct invade_job_struct *) calloc(1, sizeof(struct invade_job_struct));
invade_queue_ptr = invade_queue_ptr->next;
invade_queue_ptr->status = PENDING;
invade_queue_ptr->n = 1000;
invade_queue_ptr->next = NULL;
printf("created element\n");
}
}
#define DEBUG 1
int main()
{
int i;
int recv_bytes;
int fifo_fd;
char fifo_name[100];
char msg[100];
pthread_attr_t attrs;
pthread_t id[THREADS_PER_SET];
sprintf(fifo_name, "runtime_system_fifo%i\0", 1);
mkfifo(fifo_name, 0666);
fifo_fd = open(fifo_name, O_RDONLY | O_NONBLOCK);
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, 65536);
for (i = 0; i < THREADS_PER_SET; i++) {
pthread_create(&id[i], &attrs, invade_thread_worker, NULL);
}
while(1) {
recv_bytes = read(fifo_fd, msg, sizeof(msg));
msg[100] = '\0';
if(strcmp(msg, "start") == 0) {
pthread_mutex_lock(&work_mutex);
create_invade_job();
pthread_cond_signal(&work_cond);
pthread_mutex_unlock(&work_mutex);
sprintf(msg, "%i", 1);
}
}
return EXIT_SUCCESS;
}
Per FIFO wird eine Nachricht gesendet, und falls diese "start" lautet, soll ein neuer Job erstellt werden und dann auch abgebarbeitet werden. Sende ich aber nacheinander 2 mal "start" printet mir nur ein Thread was aus, also gehe ich davon aus, dass ich die Jobs nicht richtig einfüge.