commit 7e9555cb31958d49af27328c5da869db9a253c86 Author: limin Date: Sat Oct 11 21:34:09 2014 +0000 Optimize shared memory window creation with pt2pt exchange in RMA code path git-svn-id: http://localhost/svn/mpi/mvapich2/branches/exp7@8305 09bc9535-d30e-0410-b1f7-d46b20a4725c diff --git a/src/mpid/ch3/channels/mrail/include/mpidi_ch3_pre.h b/src/mpid/ch3/channels/mrail/include/mpidi_ch3_pre.h index b66995d..b055008 100644 --- a/src/mpid/ch3/channels/mrail/include/mpidi_ch3_pre.h +++ b/src/mpid/ch3/channels/mrail/include/mpidi_ch3_pre.h @@ -323,6 +323,7 @@ typedef pthread_mutex_t MPIDI_CH3I_SHM_MUTEX; #define MPIDI_CH3_WIN_DECL \ int fall_back; \ + int shm_win_pt2pt; \ int enable_fast_path; \ int use_rdma_path; \ int is_active; \ @@ -367,7 +368,8 @@ typedef pthread_mutex_t MPIDI_CH3I_SHM_MUTEX; MPIDI_CH3I_SHM_MUTEX *shm_mutex; /* shared memory windows -- lock for \ accumulate/atomic operations */ \ MPIU_SHMW_Hnd_t shm_mutex_segment_handle; /* handle to interprocess mutex memory \ - region */ + region */ \ + int *shm_l2g_rank; #endif /* defined(CHANNEL_MRAIL) */ #endif /* !defined(MPICH_MPIDI_CH3_PRE_H_INCLUDED) */ diff --git a/src/mpid/ch3/channels/mrail/src/gen2/ibv_param.c b/src/mpid/ch3/channels/mrail/src/gen2/ibv_param.c index 652173c..4ad76d8 100644 --- a/src/mpid/ch3/channels/mrail/src/gen2/ibv_param.c +++ b/src/mpid/ch3/channels/mrail/src/gen2/ibv_param.c @@ -1009,6 +1009,10 @@ int rdma_set_smp_parameters(struct mv2_MPIDI_CH3I_RDMA_Process_t *proc) #endif #endif + proc->shm_win_pt2pt = (value = + getenv("MV2_USE_SHM_WIN_PT2PT")) != + NULL ? !!atoi(value) : 0; + /* Set Limic Thresholds */ set_limic_thresholds(proc); @@ -1351,6 +1355,10 @@ int rdma_get_control_parameters(struct mv2_MPIDI_CH3I_RDMA_Process_t *proc) } #endif + proc->shm_win_pt2pt = (value = + getenv("MV2_USE_SHM_WIN_PT2PT")) != + NULL ? !!atoi(value) : 0; + #if !defined(DISABLE_PTMALLOC) proc->has_lazy_mem_unregister = (value = getenv("MV2_USE_LAZY_MEM_UNREGISTER")) != diff --git a/src/mpid/ch3/channels/mrail/src/gen2/rdma_iba_1sc.c b/src/mpid/ch3/channels/mrail/src/gen2/rdma_iba_1sc.c index 7471c90..1763ca2 100644 --- a/src/mpid/ch3/channels/mrail/src/gen2/rdma_iba_1sc.c +++ b/src/mpid/ch3/channels/mrail/src/gen2/rdma_iba_1sc.c @@ -840,6 +840,71 @@ fn_exit: return complete; } + +void mv2_init_rank_for_barrier (MPID_Win ** win_ptr) +{ + int i, comm_size; + MPIDI_VC_t* vc=NULL; + MPID_Comm *comm_ptr=NULL; + + MPIU_Assert(win_ptr != NULL); + MPID_Comm_get_ptr(MPI_COMM_WORLD, comm_ptr ); + comm_size = comm_ptr->local_size; + + (*win_ptr)->shm_l2g_rank = (int *) + MPIU_Malloc(g_smpi.num_local_nodes * sizeof(int)); + if((*win_ptr)->shm_l2g_rank == NULL) { + ibv_error_abort (GEN_EXIT_ERR, + "rdma_iba_1sc: error allocating shm_l2g_rank"); + } + + for(i=0; ismp.local_nodes != -1) { + (*win_ptr)->shm_l2g_rank[vc->smp.local_nodes] = vc->pg_rank; + } + } +} + +int MPIDI_CH3I_barrier_in_rma(MPID_Win **win_ptr, int rank, int node_size, int comm_size) +{ + int lsrc, ldst, src, dst, mask, mpi_errno=MPI_SUCCESS; + int mpi_errno_ret = MPI_SUCCESS; + MPI_Comm comm; + int * errflag=NULL, i; + int num_send=0x01; + int l_rank = g_smpi.my_local_id; + + MPIU_Assert(win_ptr != NULL); + /* Trivial barriers return immediately */ + if (node_size == 1) goto fn_exit; + + comm = MPI_COMM_WORLD; + + mask = 0x1; + while (mask < node_size) { + MPID_Request *req_ptr; + ldst = (l_rank + mask) % node_size; + lsrc = (l_rank - mask + node_size) % node_size; + + src = (*win_ptr)->shm_l2g_rank[lsrc]; + dst = (*win_ptr)->shm_l2g_rank[ldst]; + + mpi_errno = MPIC_Sendrecv(NULL, 0, MPI_BYTE, dst, + MPIR_BARRIER_TAG, NULL, 0, MPI_BYTE, + src, MPIR_BARRIER_TAG, comm, + MPI_STATUS_IGNORE, errflag); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + mask <<= 1; + } + +fn_exit: + return mpi_errno; +fn_fail: + goto fn_exit; + +} + /* Go through RMA op list once, and start as many RMA ops as possible */ void MPIDI_CH3I_RDMA_try_rma(MPID_Win * win_ptr, int target_rank) diff --git a/src/mpid/ch3/channels/mrail/src/gen2/rdma_impl.h b/src/mpid/ch3/channels/mrail/src/gen2/rdma_impl.h index cbad601..7d74e51 100644 --- a/src/mpid/ch3/channels/mrail/src/gen2/rdma_impl.h +++ b/src/mpid/ch3/channels/mrail/src/gen2/rdma_impl.h @@ -76,6 +76,7 @@ typedef struct mv2_MPIDI_CH3I_RDMA_Process_t { uint8_t has_ring_startup; uint8_t has_lazy_mem_unregister; uint8_t has_one_sided; + uint8_t shm_win_pt2pt; uint8_t has_flush; int maxtransfersize; int global_used_send_cq; diff --git a/src/mpid/ch3/channels/mrail/src/rdma/ch3_win_fns.c b/src/mpid/ch3/channels/mrail/src/rdma/ch3_win_fns.c index 797be3c..491b358 100644 --- a/src/mpid/ch3/channels/mrail/src/rdma/ch3_win_fns.c +++ b/src/mpid/ch3/channels/mrail/src/rdma/ch3_win_fns.c @@ -20,6 +20,8 @@ #include "mpidimpl.h" #include "mpiinfo.h" #include "mpidrma.h" +#include "mpimem.h" +#include "rdma_impl.h" #include "coll_shmem.h" #include "bcast_tuning.h" @@ -36,6 +38,9 @@ MPIU_INSTR_DURATION_EXTERN_DECL(wincreate_allgather); static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info *info, MPID_Comm *comm_ptr, void *base_ptr, MPID_Win **win_ptr); +#define SYNC_WIN_HND 111 +#define SYNC_WIN_MUTEX 112 + #undef FUNCNAME #define FUNCNAME MPIDI_CH3_Win_shared_query #undef FCNAME @@ -119,13 +124,20 @@ int MPIDI_CH3_SHM_Win_free(MPID_Win **win_ptr) that are on the same node as this process (node_comm). If node_comm == NULL, this process is the only one on this node, therefore we use comm_self as node comm. */ - MPI_Comm shmem_comm; - shmem_comm = (*win_ptr)->comm_ptr->ch.shmem_comm; - MPID_Comm_get_ptr(shmem_comm, node_comm_ptr); - MPIU_Assert(node_comm_ptr != NULL); - if (node_comm_ptr->rank == 0) { - MPIDI_CH3I_SHM_MUTEX_DESTROY(*win_ptr); + if (likely(!(*win_ptr)->shm_win_pt2pt)) { + MPI_Comm shmem_comm; + shmem_comm = (*win_ptr)->comm_ptr->ch.shmem_comm; + MPID_Comm_get_ptr(shmem_comm, node_comm_ptr); + MPIU_Assert(node_comm_ptr != NULL); + if (node_comm_ptr->rank == 0) { + MPIDI_CH3I_SHM_MUTEX_DESTROY(*win_ptr); + } + } else { + if (g_smpi.my_local_id == 0) { + MPIDI_CH3I_SHM_MUTEX_DESTROY(*win_ptr); + } + MPIU_Free((*win_ptr)->shm_l2g_rank); } /* detach from shared memory segment */ @@ -165,6 +177,88 @@ int MPIDI_CH3_Win_fns_init(MPIDI_CH3U_Win_fns_t *win_fns) } +static int send_sync_msgs (MPID_Win **win_ptr, int comm_size, char *serialized_hnd_ptr, int tag) +{ + int i, mpi_errno = MPI_SUCCESS; + MPI_Request *req; + MPIU_CHKLMEM_DECL(2); + MPI_Status *status; + MPIDI_VC_t *vc = NULL; + MPIU_CHKLMEM_MALLOC(req, MPI_Request *, comm_size*sizeof(MPI_Request), mpi_errno, "req"); + MPIU_CHKLMEM_MALLOC(status, MPI_Status *, comm_size*sizeof(MPI_Status), mpi_errno, "status"); + + for (i = 0; i < comm_size; i++) { + MPIDI_Comm_get_vc((*win_ptr)->comm_ptr, i, &vc); + + if (vc->pg_rank == MPIDI_Process.my_pg_rank) { + req[i] = MPI_REQUEST_NULL; + continue; + } + + if (vc->smp.local_rank != -1) { + MPID_Request *req_ptr; + mpi_errno = MPID_Isend(serialized_hnd_ptr, MPIU_SHMW_GHND_SZ, MPI_BYTE, + i, tag, (*win_ptr)->comm_ptr, + MPID_CONTEXT_INTRA_PT2PT, &req_ptr); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + req[i] = req_ptr->handle; + } else { + req[i] = MPI_REQUEST_NULL; + } + + } + + mpi_errno = MPIR_Waitall_impl(comm_size, req, status); + if (mpi_errno && mpi_errno != MPI_ERR_IN_STATUS) MPIU_ERR_POP(mpi_errno); + + /* --BEGIN ERROR HANDLING-- */ + if (mpi_errno == MPI_ERR_IN_STATUS) { + for (i = 0; i < comm_size; i++) { + if (status[i].MPI_ERROR != MPI_SUCCESS) { + mpi_errno = status[i].MPI_ERROR; + MPIU_ERR_POP(mpi_errno); + } + } + } + +fn_exit: + MPIU_CHKLMEM_FREEALL(); + return mpi_errno; + +fn_fail: + goto fn_exit; +} + +static int recv_sync_msgs (MPID_Win **win_ptr, char *serialized_hnd, int tag) +{ + int mpi_errno = MPI_SUCCESS; + MPI_Request req[1]; + MPI_Status status[1]; + MPID_Request *req_ptr; + + mpi_errno = MPID_Irecv(serialized_hnd, MPIU_SHMW_GHND_SZ, MPI_BYTE, MPI_ANY_SOURCE, tag, + (*win_ptr)->comm_ptr, MPID_CONTEXT_INTRA_PT2PT, &req_ptr); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + req[0] = req_ptr->handle; + + mpi_errno = MPIR_Waitall_impl(1, req, status); + if (mpi_errno && mpi_errno != MPI_ERR_IN_STATUS) MPIU_ERR_POP(mpi_errno); + /* --BEGIN ERROR HANDLING-- */ + if (mpi_errno == MPI_ERR_IN_STATUS) { + if (status[0].MPI_ERROR != MPI_SUCCESS) { + mpi_errno = status[0].MPI_ERROR; + MPIU_ERR_POP(mpi_errno); + } + } + /* --END ERROR HANDLING-- */ + +fn_exit: + return mpi_errno; + +fn_fail: + goto fn_exit; +} + #undef FUNCNAME #define FUNCNAME MPIDI_CH3I_Win_allocate_shm #undef FCNAME @@ -223,55 +317,64 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info * /* This node comm only works with hydra, it doesn't work when using mpirun_rsh, so call this * function to create shm comm */ - if (!mv2_enable_shmem_collectives && (*win_ptr)->shm_coll_comm_ref == -1) { - /* Shared memory for collectives */ - mpi_errno = MPIDI_CH3I_SHMEM_COLL_init(MPIDI_Process.my_pg, - g_smpi.my_local_id); - if (mpi_errno) { - MPIU_ERR_POP(mpi_errno); - } + (*win_ptr)->shm_win_pt2pt = mv2_MPIDI_CH3I_RDMA_Process.shm_win_pt2pt; - /* local barrier */ - mpi_errno = MPIR_Barrier_impl(comm_ptr, &errflag); - if (mpi_errno) { - MPIU_ERR_POP(mpi_errno); - } + if (likely(!(*win_ptr)->shm_win_pt2pt)) { + if (!mv2_enable_shmem_collectives && (*win_ptr)->shm_coll_comm_ref == -1) { + /* Shared memory for collectives */ + mpi_errno = MPIDI_CH3I_SHMEM_COLL_init(MPIDI_Process.my_pg, + g_smpi.my_local_id); + if (mpi_errno) { + MPIU_ERR_POP(mpi_errno); + } - /* Memory Mapping shared files for collectives*/ - mpi_errno = MPIDI_CH3I_SHMEM_COLL_Mmap(MPIDI_Process.my_pg, - g_smpi.my_local_id); - if (mpi_errno) { - MPIU_ERR_POP(mpi_errno); - } + /* local barrier */ + mpi_errno = MPIR_Barrier_impl(comm_ptr, &errflag); + if (mpi_errno) { + MPIU_ERR_POP(mpi_errno); + } - /* local barrier */ - mpi_errno = MPIR_Barrier_impl(comm_ptr, &errflag); - if (mpi_errno) { - MPIU_ERR_POP(mpi_errno); - } + /* Memory Mapping shared files for collectives*/ + mpi_errno = MPIDI_CH3I_SHMEM_COLL_Mmap(MPIDI_Process.my_pg, + g_smpi.my_local_id); + if (mpi_errno) { + MPIU_ERR_POP(mpi_errno); + } - /* Unlink mapped files so that they get cleaned up when - * * process exits */ - MPIDI_CH3I_SHMEM_COLL_Unlink(); - (*win_ptr)->shm_coll_comm_ref = 1; - } else if ((*win_ptr)->shm_coll_comm_ref > 0) { - (*win_ptr)->shm_coll_comm_ref++; - } + /* local barrier */ + mpi_errno = MPIR_Barrier_impl(comm_ptr, &errflag); + if (mpi_errno) { + MPIU_ERR_POP(mpi_errno); + } - if((*win_ptr)->comm_ptr->ch.shmem_coll_ok == 0) - mpi_errno = create_2level_comm((*win_ptr)->comm_ptr->handle, (*win_ptr)->comm_ptr->local_size, (*win_ptr)->comm_ptr->rank); - if(mpi_errno) { - MPIU_ERR_POP(mpi_errno); - } + /* Unlink mapped files so that they get cleaned up when + * * process exits */ + MPIDI_CH3I_SHMEM_COLL_Unlink(); + (*win_ptr)->shm_coll_comm_ref = 1; + } else if ((*win_ptr)->shm_coll_comm_ref > 0) { + (*win_ptr)->shm_coll_comm_ref++; + } + + if((*win_ptr)->comm_ptr->ch.shmem_coll_ok == 0) + mpi_errno = create_2level_comm((*win_ptr)->comm_ptr->handle, + (*win_ptr)->comm_ptr->local_size, (*win_ptr)->comm_ptr->rank); + if(mpi_errno) { + MPIU_ERR_POP(mpi_errno); + } - shmem_comm = (*win_ptr)->comm_ptr->ch.shmem_comm; - MPID_Comm_get_ptr(shmem_comm, node_comm_ptr); + shmem_comm = (*win_ptr)->comm_ptr->ch.shmem_comm; + MPID_Comm_get_ptr(shmem_comm, node_comm_ptr); - MPIU_Assert(node_comm_ptr != NULL); - - node_size = node_comm_ptr->local_size; - node_rank = node_comm_ptr->rank; + MPIU_Assert(node_comm_ptr != NULL); + node_size = node_comm_ptr->local_size; + node_rank = node_comm_ptr->rank; + } + else { + mv2_init_rank_for_barrier(win_ptr); + node_size = g_smpi.num_local_nodes; + node_rank = g_smpi.my_local_id; + } MPIR_T_PVAR_TIMER_START(RMA, rma_wincreate_allgather); /* allocate memory for the base addresses, disp_units, and completion counters of all processes */ @@ -373,14 +476,25 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info * mpi_errno = MPIU_SHMW_Hnd_get_serialized_by_ref((*win_ptr)->shm_segment_handle, &serialized_hnd_ptr); if (mpi_errno) MPIU_ERR_POP(mpi_errno); - mpi_errno = MPIR_Shmem_Bcast_MV2(serialized_hnd_ptr, MPIU_SHMW_GHND_SZ, MPI_BYTE, 0, node_comm_ptr, &errflag); - if (mpi_errno) MPIU_ERR_POP(mpi_errno); - MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + if (likely(!(*win_ptr)->shm_win_pt2pt)) { + mpi_errno = MPIR_Shmem_Bcast_MV2(serialized_hnd_ptr, MPIU_SHMW_GHND_SZ, MPI_BYTE, 0, node_comm_ptr, &errflag); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + } else { + /*Use pt2pt if the number of shared memory communicator is large */ + mpi_errno = send_sync_msgs(win_ptr, comm_size, serialized_hnd_ptr, SYNC_WIN_HND); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + } /* wait for other processes to attach to win */ - mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag); - if (mpi_errno) MPIU_ERR_POP(mpi_errno); - MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + if (likely(!(*win_ptr)->shm_win_pt2pt)) { + mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + } else { + mpi_errno = MPIDI_CH3I_barrier_in_rma(win_ptr, rank, node_size, comm_size); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + } /* unlink shared memory region so it gets deleted when all processes exit */ mpi_errno = MPIU_SHMW_Seg_remove((*win_ptr)->shm_segment_handle); @@ -389,9 +503,14 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info * } else { char serialized_hnd[MPIU_SHMW_GHND_SZ] = {0}; - mpi_errno = MPIR_Shmem_Bcast_MV2(serialized_hnd, MPIU_SHMW_GHND_SZ, MPI_BYTE, 0, node_comm_ptr, &errflag); - if (mpi_errno) MPIU_ERR_POP(mpi_errno); - MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + if (likely(!(*win_ptr)->shm_win_pt2pt)) { + mpi_errno = MPIR_Shmem_Bcast_MV2(serialized_hnd, MPIU_SHMW_GHND_SZ, MPI_BYTE, 0, node_comm_ptr, &errflag); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + } else { + mpi_errno = recv_sync_msgs(win_ptr, serialized_hnd, SYNC_WIN_HND); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + } mpi_errno = MPIU_SHMW_Hnd_deserialize((*win_ptr)->shm_segment_handle, serialized_hnd, strlen(serialized_hnd)); if (mpi_errno) MPIU_ERR_POP(mpi_errno); @@ -401,9 +520,14 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info * (char **)&(*win_ptr)->shm_base_addr, 0); if (mpi_errno) MPIU_ERR_POP(mpi_errno); - mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag); - if (mpi_errno) MPIU_ERR_POP(mpi_errno); - MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + if (likely(!(*win_ptr)->shm_win_pt2pt)) { + mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + } else { + mpi_errno = MPIDI_CH3I_barrier_in_rma(win_ptr, rank, node_size, comm_size); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + } } /* Allocated the interprocess mutex segment. */ @@ -424,14 +548,25 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info * mpi_errno = MPIU_SHMW_Hnd_get_serialized_by_ref((*win_ptr)->shm_mutex_segment_handle, &serialized_hnd_ptr); if (mpi_errno) MPIU_ERR_POP(mpi_errno); - mpi_errno = MPIR_Shmem_Bcast_MV2(serialized_hnd_ptr, MPIU_SHMW_GHND_SZ, MPI_CHAR, 0, node_comm_ptr, &errflag); - if (mpi_errno) MPIU_ERR_POP(mpi_errno); - MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + if (likely(!(*win_ptr)->shm_win_pt2pt)) { + mpi_errno = MPIR_Shmem_Bcast_MV2(serialized_hnd_ptr, MPIU_SHMW_GHND_SZ, MPI_CHAR, 0, node_comm_ptr, &errflag); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + } else { + /*Use pt2pt if the number of shared memory communicator is large */ + mpi_errno = send_sync_msgs(win_ptr, comm_size, serialized_hnd_ptr, SYNC_WIN_MUTEX); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + } /* wait for other processes to attach to win */ - mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag); - if (mpi_errno) MPIU_ERR_POP(mpi_errno); - MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + if (likely(!(*win_ptr)->shm_win_pt2pt)) { + mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + } else { + mpi_errno = MPIDI_CH3I_barrier_in_rma(win_ptr, rank, node_size, comm_size); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + } /* unlink shared memory region so it gets deleted when all processes exit */ mpi_errno = MPIU_SHMW_Seg_remove((*win_ptr)->shm_mutex_segment_handle); @@ -440,9 +575,15 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info * char serialized_hnd[MPIU_SHMW_GHND_SZ] = {0}; /* get serialized handle from rank 0 and deserialize it */ - mpi_errno = MPIR_Shmem_Bcast_MV2(serialized_hnd, MPIU_SHMW_GHND_SZ, MPI_CHAR, 0, node_comm_ptr, &errflag); - if (mpi_errno) MPIU_ERR_POP(mpi_errno); - MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + if (likely(!(*win_ptr)->shm_win_pt2pt)) { + mpi_errno = MPIR_Shmem_Bcast_MV2(serialized_hnd, MPIU_SHMW_GHND_SZ, MPI_CHAR, 0, node_comm_ptr, &errflag); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + } else { + /*Use pt2pt if the number of shared memory communicator is large */ + mpi_errno = recv_sync_msgs(win_ptr, serialized_hnd, SYNC_WIN_MUTEX); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + } mpi_errno = MPIU_SHMW_Hnd_deserialize((*win_ptr)->shm_mutex_segment_handle, serialized_hnd, strlen(serialized_hnd)); if (mpi_errno) MPIU_ERR_POP(mpi_errno); @@ -452,9 +593,15 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info * (char **)&(*win_ptr)->shm_mutex, 0); if (mpi_errno) MPIU_ERR_POP(mpi_errno); - mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag); - if (mpi_errno) MPIU_ERR_POP(mpi_errno); - MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + if (likely(!(*win_ptr)->shm_win_pt2pt)) { + mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail"); + } else { + mpi_errno = MPIDI_CH3I_barrier_in_rma(win_ptr, rank, node_size, comm_size); + if (mpi_errno) MPIU_ERR_POP(mpi_errno); + sleep(1); + } } /* compute the base addresses of each process within the shared memory segment */ diff --git a/src/mpid/ch3/include/mpidrma.h b/src/mpid/ch3/include/mpidrma.h index fe1b325..f17db99 100644 --- a/src/mpid/ch3/include/mpidrma.h +++ b/src/mpid/ch3/include/mpidrma.h @@ -144,6 +144,7 @@ int MPIDI_CH3I_RDMA_post(MPID_Win * win_ptr, int target_rank); int MPIDI_CH3I_RDMA_complete(MPID_Win * win_ptr, int start_grp_size, int *ranks_in_win_grp); int MPIDI_CH3I_RDMA_finish_rma(MPID_Win * win_ptr); int MPIDI_CH3I_RDMA_finish_rma_target(MPID_Win *win_ptr, int target_rank); +int MPIDI_CH3I_barrier_in_rma(MPID_Win **win_ptr, int rank, int node_size, int comm_size); #endif /* defined(CHANNEL_MRAIL) */ /*** RMA OPS LIST HELPER ROUTINES ***/ diff --git a/src/mpid/ch3/src/ch3u_rma_ops.c b/src/mpid/ch3/src/ch3u_rma_ops.c index 7872f1b..0478e76 100644 --- a/src/mpid/ch3/src/ch3u_rma_ops.c +++ b/src/mpid/ch3/src/ch3u_rma_ops.c @@ -94,7 +94,7 @@ int MPIDI_Win_free(MPID_Win **win_ptr) if ((*win_ptr)->fall_back != 1) { MPIDI_CH3I_RDMA_win_free(win_ptr); } - if( (*win_ptr)->comm_ptr->ch.shmem_coll_ok == 1) { + if( (!(*win_ptr)->shm_win_pt2pt) && (*win_ptr)->comm_ptr->ch.shmem_coll_ok == 1) { free_2level_comm((*win_ptr)->comm_ptr); } #endif /* defined(CHANNEL_MRAIL) */ diff --git a/src/mpid/ch3/src/ch3u_rma_sync.c b/src/mpid/ch3/src/ch3u_rma_sync.c index 11ed305..a9c8d9b 100644 --- a/src/mpid/ch3/src/ch3u_rma_sync.c +++ b/src/mpid/ch3/src/ch3u_rma_sync.c @@ -1070,6 +1070,7 @@ extern int limic_fd; #define SYNC_POST_TAG 100 + static int send_lock_msg(int dest, int lock_type, MPID_Win *win_ptr); static int send_unlock_msg(int dest, MPID_Win *win_ptr); /* static int send_flush_msg(int dest, MPID_Win *win_ptr); */ @@ -1247,20 +1248,35 @@ int MPIDI_Win_fence(int assert, MPID_Win *win_ptr) if (win_ptr->shm_allocated == TRUE) { MPID_Comm *node_comm_ptr = NULL; -#if defined(CHANNEL_MRAIL) || defined(CHANNEL_PSM) - MPI_Comm shmem_comm; +#if defined(CHANNEL_MRAIL) + if (likely(!win_ptr->shm_win_pt2pt)) +#elif defined(CHANNEL_PSM) + { + MPI_Comm shmem_comm; - shmem_comm = win_ptr->comm_ptr->ch.shmem_comm; + shmem_comm = win_ptr->comm_ptr->ch.shmem_comm; - MPID_Comm_get_ptr(shmem_comm, node_comm_ptr); - MPIU_Assert(node_comm_ptr != NULL); + MPID_Comm_get_ptr(shmem_comm, node_comm_ptr); + MPIU_Assert(node_comm_ptr != NULL); + } #else node_comm_ptr = win_ptr->comm_ptr->node_comm; #endif /* Ensure ordering of load/store operations. */ OPA_read_write_barrier(); - - mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag); +#if defined(CHANNEL_MRAIL) + if(likely(!win_ptr->shm_win_pt2pt)) +#elif defined(CHANNEL_PSM) + { + mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag); + } +#if defined (CHANNEL_MRAIL) + else { + mpi_errno = MPIDI_CH3I_barrier_in_rma(&win_ptr, win_ptr->comm_ptr->rank, + g_smpi.num_local_nodes, + win_ptr->comm_ptr->local_size); + } +#endif if (mpi_errno) {goto fn_fail;} /* Ensure ordering of load/store operations. */ diff --git a/src/mpid/ch3/src/mpid_rma.c b/src/mpid/ch3/src/mpid_rma.c index 0c21b8e..7ea64d5 100644 --- a/src/mpid/ch3/src/mpid_rma.c +++ b/src/mpid/ch3/src/mpid_rma.c @@ -331,6 +331,7 @@ static int win_init(MPI_Aint size, int disp_unit, int create_flavor, int model, (*win_ptr)->use_rdma_path = 0; (*win_ptr)->use_direct_shm = 0; (*win_ptr)->shm_coll_comm_ref = -1; + (*win_ptr)->shm_win_pt2pt = 0; #endif /* defined(CHANNEL_MRAIL) */ #if defined (CHANNEL_PSM) (*win_ptr)->outstanding_rma = 0;