2016-11-28 4 views
0

나는 생산자 - 소비자 패러다임을 모방 한 프로그램을 구현할 때 문제가 발생했습니다. 내가 사용하는 코드는 오직 하나의 생산자와 하나의 소비자만을 가지고 있지만 다른 생산자와 다른 소비자를 추가 할 때 작동하지 않습니다.C 제작자 - 소비자 (PThreads 사용)

나는 이것에 잠시 머물렀고, 왜 내가 오류 Synchronization Error: Producer x Just overwrote x from Slot x을 얻고 있는지를 알 수없는 것처럼 보입니다. 나는 여러 가지 테스트를 통해이 문제를 추적했으며 문제는 다른 제작자가 중요한 섹션에있을 때 제작자가 차단되지 않는다는 사실에 있습니다.

#include <stdio.h> 
#include <pthread.h> 
#include <semaphore.h> 
#include <stdlib.h> 

void *producer (void *) ; 
void *consumer(void *) ; 
sem_t empty, full, mutex ; 

int buffer[10] /*note extra long space!*/ ; 
int ID[10] ; 
int in = 0 ; int out = 0 ; 
int BUFFER_SIZE = 10 ; 
int nextProduced = 0 ; 

main() { 
    int i ; 
    pthread_t TID[10] ; 

    sem_init(&empty, 0, 10) ; 
    sem_init(&full, 0, 0) ; 
    sem_init(&mutex, 0, 1) ; 

    for(i = 0; i < 10; i++) { 
     ID[i] = i ; 
     buffer[i] = -1 ; 
    } 

    //for(i = 0; i < 5000; i += 2) { 
     pthread_create(&TID[0], NULL, producer, (void *) &ID[0]) ; 
     printf("Producer ID = %d created!\n", 0) ; 
     pthread_create(&TID[1], NULL, consumer, (void *) &ID[1]) ; 
     printf("Consumer ID = %d created!\n", 1) ; 

     pthread_create(&TID[2], NULL, producer, (void *) &ID[2]) ; 
     printf("Producer ID = %d created!\n", 2) ; 
     pthread_create(&TID[3], NULL, consumer, (void *) &ID[3]) ; 
     printf("Consumer ID = %d created!\n", 3) ; 
    //} 

    for(i = 0; i < 10 ; i++) { 
     pthread_join(TID[i], NULL) ; 
    } 
} 

void *producer(void *Boo) { 
    int *ptr; 
    int ID; 
    ptr = (int *) Boo; 
    ID = *ptr; 
    while (1) { 
     nextProduced++; //Producing Integers 
     /* Check to see if Overwriting unread slot */ 
     sem_wait(&empty); 
     sem_wait(&mutex); 

     if (buffer[in] != -1) { 
      printf("Synchronization Error: Producer %d Just overwrote %d from Slot %d\n", ID, buffer[in], in); 
      exit(0); 
     } 

     /* Looks like we are OK */ 
     buffer[in] = nextProduced; 
     printf("Producer %d. Put %d in slot %d\n", ID, nextProduced, in); 
     in = (in + 1) % BUFFER_SIZE; 
     printf("incremented in!\n"); 

     sem_post(&mutex); 
     sem_post(&full); 
    } 
} 

void *consumer (void *Boo) { 
    static int nextConsumed = 0 ; 
    int *ptr ; 
    int ID ; 
    ptr = (int *) Boo ; 
    ID = *ptr ; 
    while (1) { 
     sem_wait(&full); 
     sem_wait(&mutex); 

     nextConsumed = buffer[out]; 
     /*Check to make sure we did not read from an empty slot*/ 
     if (nextConsumed == -1) { 
      printf("Synch Error: Consumer %d Just Read from empty slot %d\n", ID, out) ; 
      exit(0) ; 
     } 
     /* We must be OK */ 
     printf("Consumer %d Just consumed item %d from slot %d\n", ID, nextConsumed, out) ; 
     buffer[out] = -1 ; 
     out = (out + 1) % BUFFER_SIZE; 

     sem_post(&mutex); 
     sem_post(&empty); 
    } 
} 

OUPUT는 :

Producer ID = 0 created! 
Producer 0. Put 1 in slot 0 
Consumer ID = 1 created! 
incremented in! 
Consumer 1 Just consumed item 1 from slot 0 
Producer ID = 2 created! 
Producer 0. Put 2 in slot 1 
Synchronization Error: Producer 2 Just overwrote 2 from Slot 1 
Consumer 1 Just consumed item 2 from slot 1 
Consumer ID = 3 created! 
incremented in! 
Consumer 3 Just consumed item 2 from slot 1 
Synch Error: Consumer 1 Just Read from empty slot 2 
Producer 0. Put 4 in slot 2 

당신이 볼 수 있듯이, 프로듀서 0 슬롯 1에 데이터를 작성하는 in, 프로듀서 2 개 시도를 증가 할 수 있습니다 프로듀서 0 전에, 그러나 슬롯 1에 2를 넣어 관리 in이 증가하지 않았기 때문입니다.

웬일인지 내 sem_waits()이 작동하지 않는 것 같습니다. 아무도 나를 도울 수 있습니까?

답변

0

내 시스템에서 실행되도록 코드를 수정했으며 정상적으로 작동하는 것 같습니다.

특정 변경 사항 : OSX에서 변경되었으므로 sem_init()에서 sem_open()sem_unlink()으로 변경해야했습니다. 그 변화와 코드를 수용하기 위해 일반적으로 인터럽트 핸들러를 추가하여^C를 입력하면 소비자와 생산자가 마무리하고 pthread_join() 및 다음의 정리 코드를 실행하도록 허용합니다. 프로세스 수를 독립적 인 버퍼 수로 묶었습니다 (ID와 버퍼 초기화 코드 참조). 두 부분을 분리했습니다. 뮤텍스 안에서 nextProduced++을 옮겼습니다. 다양한 무작위 스타일의 비틀기 :

#include <stdio.h> 
#include <pthread.h> 
#include <semaphore.h> 
#include <stdlib.h> 
#include <signal.h> 

#define MAX_THREADS 5 
#define BUFFER_SIZE 10 

sem_t *empty, *full, *mutex; 

int buffer[BUFFER_SIZE]; 
int in = 0, out = 0; 

static volatile int keepRunning = 1; 

void intHandler(int dummy) { 
    keepRunning = 0; 
} 

void *producer(void * id_ptr) { 
    int ID = *((int *) id_ptr); 
    static int nextProduced = 0; 

    while (keepRunning) { 

     (void) sem_wait(empty); 
     (void) sem_wait(mutex); 

     /* Check to see if Overwriting unread slot */ 
     if (buffer[in] != -1) { 
      fprintf(stderr, "Synchronization Error: Producer %d Just overwrote %d from Slot %d\n", ID, buffer[in], in); 
      exit(1); 
     } 

     nextProduced++; // Producing Integers 

     /* Looks like we are OK */ 
     buffer[in] = nextProduced; 
     printf("Producer %d. Put %d in slot %d\n", ID, nextProduced, in); 
     in = (in + 1) % BUFFER_SIZE; 
     printf("incremented in!\n"); 

     (void) sem_post(mutex); 
     (void) sem_post(full); 
    } 

    return NULL; 
} 

void *consumer (void *id_ptr) { 
    int ID = *((int *) id_ptr); 
    static int nextConsumed = 0; 

    while (keepRunning) { 

     (void) sem_wait(full); 
     (void) sem_wait(mutex); 

     nextConsumed = buffer[out]; 

     /* Check to make sure we did not read from an empty slot */ 
     if (nextConsumed == -1) { 
      fprintf(stderr, "Synch Error: Consumer %d Just Read from empty slot %d\n", ID, out); 
      exit(1); 
     } 

     /* We must be OK */ 
     printf("Consumer %d Just consumed item %d from slot %d\n", ID, nextConsumed, out); 
     buffer[out] = -1; 
     out = (out + 1) % BUFFER_SIZE; 
     printf("incremented out!\n"); 

     (void) sem_post(mutex); 
     (void) sem_post(empty); 
    } 

    return NULL; 
} 

int main() { 
    int ID[MAX_THREADS]; 
    pthread_t TID[MAX_THREADS]; 

    empty = sem_open("/empty", O_CREAT, 0644, 10); 
    full = sem_open("/full", O_CREAT, 0644, 0); 
    mutex = sem_open("/mutex", O_CREAT, 0644, 1); 

    signal(SIGINT, intHandler); 

    for (int i = 0; i < MAX_THREADS; i++) { 
     ID[i] = i; 
    } 

    for (int i = 0; i < BUFFER_SIZE; i++) { 
     buffer[i] = -1; 
    } 

    pthread_create(&TID[0], NULL, producer, (void *) &ID[0]); 
    printf("Producer ID = %d created!\n", 0); 
    pthread_create(&TID[1], NULL, consumer, (void *) &ID[1]); 
    printf("Consumer ID = %d created!\n", 1); 

    pthread_create(&TID[2], NULL, producer, (void *) &ID[2]); 
    printf("Producer ID = %d created!\n", 2); 
    pthread_create(&TID[3], NULL, consumer, (void *) &ID[3]); 
    printf("Consumer ID = %d created!\n", 3); 

    for (int i = 0; i < 4; i++) { 
     pthread_join(TID[i], NULL); 
    } 

    (void) sem_unlink("/empty"); 
    (void) sem_unlink("/full"); 
    (void) sem_unlink("/mutex"); 

    return 0; 
} 

OUTPUT

> ./a.out 
Producer ID = 0 created! 
Producer 0. Put 1 in slot 0 
Consumer ID = 1 created! 
incremented in! 
Producer ID = 2 created! 
Producer 0. Put 2 in slot 1 
incremented in! 
Producer 2. Put 3 in slot 2 
incremented in! 
Consumer ID = 3 created! 
Producer 0. Put 4 in slot 3 
incremented in! 
Consumer 1 Just consumed item 1 from slot 0 
incremented out! 
Consumer 3 Just consumed item 2 from slot 1 
incremented out! 
Producer 2. Put 5 in slot 4 
incremented in! 
Producer 0. Put 6 in slot 5 
incremented in! 
Consumer 1 Just consumed item 3 from slot 2 
incremented out! 
Consumer 3 Just consumed item 4 from slot 3 
incremented out! 
Producer 2. Put 7 in slot 6 
incremented in! 
Producer 0. Put 8 in slot 7 
incremented in! 
Consumer 1 Just consumed item 5 from slot 4 
incremented out! 
Consumer 3 Just consumed item 6 from slot 5 
...